You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yidan zhao <hi...@gmail.com> on 2021/11/01 07:05:32 UTC

关于FlinkSQL从kafka读取数据写到hive的一些问题

如题,我看了官方文档,定义好kafka和hive表。
写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。

SET table.sql-dialect=hive;CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file');
SET table.sql-dialect=default;CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
watermark on TIMESTAMP column) WITH (...);


如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。

其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。


此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

Posted by yidan zhao <hi...@gmail.com>.
thanks

Tony Wei <to...@gmail.com> 于2021年11月2日周二 下午1:12写道:

> Hi yidan,
>
> 你可以試試 SQL Hints [1].
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/
>
>
> yidan zhao <hi...@gmail.com> 於 2021年11月2日 週二 下午1:03寫道:
>
> > 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
> > 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
> > flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
> > 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'sink.partition-commit.delay'='1 h',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file');
> >
> > Caizhi Weng <ts...@gmail.com> 于2021年11月2日周二 上午10:47写道:
> >
> > > Hi!
> > >
> > > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink
> 就会自动读取
> > > hive 里表的结构等信息。但 kafka 的表定义仍然要写。
> > >
> > > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
> > >
> > > yidan zhao <hi...@gmail.com> 于2021年11月1日周一 下午3:05写道:
> > >
> > > > 如题,我看了官方文档,定义好kafka和hive表。
> > > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> > > >
> > > > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> > > >   user_id STRING,
> > > >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED
> AS
> > > > parquet TBLPROPERTIES (
> > > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> > > >   'sink.partition-commit.trigger'='partition-time',
> > > >   'sink.partition-commit.delay'='1 h',
> > > >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > > > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> > > >   user_id STRING,
> > > >   order_amount DOUBLE,
> > > >   log_ts TIMESTAMP(3),
> > > >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > > > watermark on TIMESTAMP column) WITH (...);
> > > >
> > > >
> > > >
> > >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> > > >
> > > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> > > >
> > > >
> > > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> > > >
> > >
> >
>

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

Posted by Tony Wei <to...@gmail.com>.
Hi yidan,

你可以試試 SQL Hints [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/hints/


yidan zhao <hi...@gmail.com> 於 2021年11月2日 週二 下午1:03寫道:

> 嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
> 因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
> flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
> 或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'sink.partition-commit.delay'='1 h',
> >   'sink.partition-commit.policy.kind'='metastore,success-file');
>
> Caizhi Weng <ts...@gmail.com> 于2021年11月2日周二 上午10:47写道:
>
> > Hi!
> >
> > hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
> > hive 里表的结构等信息。但 kafka 的表定义仍然要写。
> >
> > 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
> >
> > yidan zhao <hi...@gmail.com> 于2021年11月1日周一 下午3:05写道:
> >
> > > 如题,我看了官方文档,定义好kafka和hive表。
> > > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> > >
> > > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> > > parquet TBLPROPERTIES (
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'sink.partition-commit.delay'='1 h',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> > >   user_id STRING,
> > >   order_amount DOUBLE,
> > >   log_ts TIMESTAMP(3),
> > >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > > watermark on TIMESTAMP column) WITH (...);
> > >
> > >
> > >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> > >
> > > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> > >
> > >
> > > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> > >
> >
>

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

Posted by yidan zhao <hi...@gmail.com>.
嗯嗯,hive catalog的确不需要重新建表,但是我的场景是:我需要通过 flinkSQL 流式将 kafka 表数据写入 hive 表。
因此就需要有如下属性等,而原先的hive表式spark-sql中创建的,肯定不可能带有这种属性。我目前想法是,比如针对原表 t1,我重新在
flinkSQL 中创建个hive表t2,但是指定location为原t1的location,同时带上如下相关属性,这样就达到目的了。
或者说,基于现有的hive表,有什么不重定义的方法,仍然可以通过sql流式将kafka表数据写进去不。
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='1 h',
>   'sink.partition-commit.policy.kind'='metastore,success-file');

Caizhi Weng <ts...@gmail.com> 于2021年11月2日周二 上午10:47写道:

> Hi!
>
> hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
> hive 里表的结构等信息。但 kafka 的表定义仍然要写。
>
> 你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。
>
> yidan zhao <hi...@gmail.com> 于2021年11月1日周一 下午3:05写道:
>
> > 如题,我看了官方文档,定义好kafka和hive表。
> > 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
> >
> > SET table.sql-dialect=hive;CREATE TABLE hive_table (
> >   user_id STRING,
> >   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> > parquet TBLPROPERTIES (
> >   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >   'sink.partition-commit.trigger'='partition-time',
> >   'sink.partition-commit.delay'='1 h',
> >   'sink.partition-commit.policy.kind'='metastore,success-file');
> > SET table.sql-dialect=default;CREATE TABLE kafka_table (
> >   user_id STRING,
> >   order_amount DOUBLE,
> >   log_ts TIMESTAMP(3),
> >   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> > watermark on TIMESTAMP column) WITH (...);
> >
> >
> >
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
> >
> > 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> > 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
> >
> >
> > 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
> >
>

Re: 关于FlinkSQL从kafka读取数据写到hive的一些问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

hive catalog 是不需要重新在 Flink SQL 里写一遍表定义的,连接到 hive catalog 的时候 Flink 就会自动读取
hive 里表的结构等信息。但 kafka 的表定义仍然要写。

你的邮件里的内容具体来自哪个文档界面呢?文档里应该是想要从 Flink 里建立 hive 表,如果已经在 hive 里建过表了就不用再建了。

yidan zhao <hi...@gmail.com> 于2021年11月1日周一 下午3:05写道:

> 如题,我看了官方文档,定义好kafka和hive表。
> 写的时候提示要指定提交策略,就又看了看文档,如下为文档实例。
>
> SET table.sql-dialect=hive;CREATE TABLE hive_table (
>   user_id STRING,
>   order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS
> parquet TBLPROPERTIES (
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>   'sink.partition-commit.trigger'='partition-time',
>   'sink.partition-commit.delay'='1 h',
>   'sink.partition-commit.policy.kind'='metastore,success-file');
> SET table.sql-dialect=default;CREATE TABLE kafka_table (
>   user_id STRING,
>   order_amount DOUBLE,
>   log_ts TIMESTAMP(3),
>   WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define
> watermark on TIMESTAMP column) WITH (...);
>
>
> 如上,如果是这样的话,那就会出现个问题。所有需要写入的hive表其实都需要重新定义一次,部分原先的表是hive中定义的。现在我需要重新定义一次可能。
>
> 其次,为了避免重新定义表有问题啥的,我可能会重新定义另一个数据库中同名表,但指定到和hive表相同的存储路径。
> 但如果hive中修改原表,我这边不改变flink hive表定义,又会出现不一致的情况。
>
>
> 此外,flink这样定义的hive表和hive自己定义的肯定意义一致吗,不会影响hive自身的读写吧。
>