You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by JasonLee <17...@163.com> on 2020/07/16 12:22:19 UTC

回复:Flink 1.11 Hive Streaming Write的问题

hi
需要开启checkpoint


| |
JasonLee
|
|
邮箱:17610775726@163.com
|

Signature is customized by Netease Mail Master

在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
   public static void main(String[] arg) throws Exception{
       StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
       EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
       StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
       String name            = "myhive";
       String defaultDatabase = "default";
       String hiveConfDir     =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
       String version         = "3.1.2";

       HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
       bsTableEnv.registerCatalog("myhive", hive);
       bsTableEnv.useCatalog("myhive");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
       bsTableEnv.executeSql("CREATE TABLE topic_products (" +
               "  id BIGINT ," +
               "  order_id STRING," +
               "  amount DECIMAL(10, 2)," +
               "  create_time TIMESTAMP " +
               ") WITH (" +
               " 'connector' = 'kafka'," +
               " 'topic' = 'order.test'," +
               " 'properties.bootstrap.servers' = 'localhost:9092'," +
               " 'properties.group.id' = 'testGroup'," +
               " 'scan.startup.mode' = 'earliest-offset', " +
               " 'format' = 'json'  " +
               ")");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

       bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
               "  id BIGINT ," +
               "  order_id STRING," +
               "  amount DECIMAL(10, 2)" +
               "  )");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
       bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
               "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

       bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
       bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
               "id, " +
               "order_id, " +
               "amount " +
               "FROM topic_products");

       Table table1 = bsTableEnv.from("hive_sink_table_streaming");
       table1.executeInsert("print_table");
   }
}

Re: Flink 1.11 Hive Streaming Write的问题

Posted by Jingsong Li <ji...@gmail.com>.
Hi Dream,

可以详述下你的测试场景吗?

Best,
Jingsong

On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <zh...@akulaku.com> wrote:

> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸 <li...@gmail.com> 于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775726@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > >    public static void main(String[] arg) throws Exception{
> > >        StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >        EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >        StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > >        String name            = "myhive";
> > >        String defaultDatabase = "default";
> > >        String hiveConfDir     =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > >        String version         = "3.1.2";
> > >
> > >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > >        bsTableEnv.registerCatalog("myhive", hive);
> > >        bsTableEnv.useCatalog("myhive");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)," +
> > >                "  create_time TIMESTAMP " +
> > >                ") WITH (" +
> > >                " 'connector' = 'kafka'," +
> > >                " 'topic' = 'order.test'," +
> > >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> > >                " 'properties.group.id' = 'testGroup'," +
> > >                " 'scan.startup.mode' = 'earliest-offset', " +
> > >                " 'format' = 'json'  " +
> > >                ")");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > >                "  id BIGINT ," +
> > >                "  order_id STRING," +
> > >                "  amount DECIMAL(10, 2)" +
> > >                "  )");
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > >                "id, " +
> > >                "order_id, " +
> > >                "amount " +
> > >                "FROM topic_products");
> > >
> > >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > >        table1.executeInsert("print_table");
> > >    }
> > > }
> > >
> >
>


-- 
Best, Jingsong Lee

Re: Flink 1.11 Hive Streaming Write的问题

Posted by Dream-底限 <zh...@akulaku.com>.
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据

李佳宸 <li...@gmail.com> 于2020年7月16日周四 下午10:39写道:

> 好的,谢谢~~~
>
> JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775726@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> >    public static void main(String[] arg) throws Exception{
> >        StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >        EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >        StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> >        String name            = "myhive";
> >        String defaultDatabase = "default";
> >        String hiveConfDir     =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> >        String version         = "3.1.2";
> >
> >        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> >        bsTableEnv.registerCatalog("myhive", hive);
> >        bsTableEnv.useCatalog("myhive");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)," +
> >                "  create_time TIMESTAMP " +
> >                ") WITH (" +
> >                " 'connector' = 'kafka'," +
> >                " 'topic' = 'order.test'," +
> >                " 'properties.bootstrap.servers' = 'localhost:9092'," +
> >                " 'properties.group.id' = 'testGroup'," +
> >                " 'scan.startup.mode' = 'earliest-offset', " +
> >                " 'format' = 'json'  " +
> >                ")");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> >        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> >                "  id BIGINT ," +
> >                "  order_id STRING," +
> >                "  amount DECIMAL(10, 2)" +
> >                "  )");
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> >        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> >                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> >        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> >                "id, " +
> >                "order_id, " +
> >                "amount " +
> >                "FROM topic_products");
> >
> >        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> >        table1.executeInsert("print_table");
> >    }
> > }
> >
>

Re: Flink 1.11 Hive Streaming Write的问题

Posted by 李佳宸 <li...@gmail.com>.
好的,谢谢~~~

JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775726@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>    public static void main(String[] arg) throws Exception{
>        StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>        String name            = "myhive";
>        String defaultDatabase = "default";
>        String hiveConfDir     =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>        String version         = "3.1.2";
>
>        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>        bsTableEnv.registerCatalog("myhive", hive);
>        bsTableEnv.useCatalog("myhive");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)," +
>                "  create_time TIMESTAMP " +
>                ") WITH (" +
>                " 'connector' = 'kafka'," +
>                " 'topic' = 'order.test'," +
>                " 'properties.bootstrap.servers' = 'localhost:9092'," +
>                " 'properties.group.id' = 'testGroup'," +
>                " 'scan.startup.mode' = 'earliest-offset', " +
>                " 'format' = 'json'  " +
>                ")");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)" +
>                "  )");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>                "id, " +
>                "order_id, " +
>                "amount " +
>                "FROM topic_products");
>
>        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>        table1.executeInsert("print_table");
>    }
> }
>