You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李佳宸 <li...@gmail.com> on 2020/07/16 10:03:10 UTC
Flink 1.11 Hive Streaming Write的问题
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。
附代码,很简短:
public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)," +
" create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json' " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)" +
" )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");
Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}
Re: Flink 1.11 Hive Streaming Write的问题
Posted by Jingsong Li <ji...@gmail.com>.
Hi Dream,
可以详述下你的测试场景吗?
Best,
Jingsong
On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 <zh...@akulaku.com> wrote:
> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸 <li...@gmail.com> 于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > > 需要开启checkpoint
> > >
> > >
> > > | |
> > > JasonLee
> > > |
> > > |
> > > 邮箱:17610775726@163.com
> > > |
> > >
> > > Signature is customized by Netease Mail Master
> > >
> > > 在2020年07月16日 18:03,李佳宸 写道:
> > > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > > 批量的hive写入,流环境的读取是正常的。
> > >
> > > 附代码,很简短:
> > >
> > > public class KafkaToHiveStreaming {
> > > public static void main(String[] arg) throws Exception{
> > > StreamExecutionEnvironment bsEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > EnvironmentSettings bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > StreamTableEnvironment bsTableEnv =
> > > StreamTableEnvironment.create(bsEnv, bsSettings);
> > > String name = "myhive";
> > > String defaultDatabase = "default";
> > > String hiveConfDir =
> > > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > > path
> > > String version = "3.1.2";
> > >
> > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > > hiveConfDir, version);
> > > bsTableEnv.registerCatalog("myhive", hive);
> > > bsTableEnv.useCatalog("myhive");
> > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > > bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > > " id BIGINT ," +
> > > " order_id STRING," +
> > > " amount DECIMAL(10, 2)," +
> > > " create_time TIMESTAMP " +
> > > ") WITH (" +
> > > " 'connector' = 'kafka'," +
> > > " 'topic' = 'order.test'," +
> > > " 'properties.bootstrap.servers' = 'localhost:9092'," +
> > > " 'properties.group.id' = 'testGroup'," +
> > > " 'scan.startup.mode' = 'earliest-offset', " +
> > > " 'format' = 'json' " +
> > > ")");
> > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >
> > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming
> (" +
> > > " id BIGINT ," +
> > > " order_id STRING," +
> > > " amount DECIMAL(10, 2)" +
> > > " )");
> > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > > bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > > ('connector' = 'print')" +
> > > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > > ALL)");
> > >
> > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> > SELECT
> > > " +
> > > "id, " +
> > > "order_id, " +
> > > "amount " +
> > > "FROM topic_products");
> > >
> > > Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > > table1.executeInsert("print_table");
> > > }
> > > }
> > >
> >
>
--
Best, Jingsong Lee
Re: Flink 1.11 Hive Streaming Write的问题
Posted by Dream-底限 <zh...@akulaku.com>.
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
李佳宸 <li...@gmail.com> 于2020年7月16日周四 下午10:39写道:
> 好的,谢谢~~~
>
> JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775726@163.com
> > |
> >
> > Signature is customized by Netease Mail Master
> >
> > 在2020年07月16日 18:03,李佳宸 写道:
> > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> > 批量的hive写入,流环境的读取是正常的。
> >
> > 附代码,很简短:
> >
> > public class KafkaToHiveStreaming {
> > public static void main(String[] arg) throws Exception{
> > StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > StreamTableEnvironment bsTableEnv =
> > StreamTableEnvironment.create(bsEnv, bsSettings);
> > String name = "myhive";
> > String defaultDatabase = "default";
> > String hiveConfDir =
> > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> > path
> > String version = "3.1.2";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > bsTableEnv.registerCatalog("myhive", hive);
> > bsTableEnv.useCatalog("myhive");
> > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> > " id BIGINT ," +
> > " order_id STRING," +
> > " amount DECIMAL(10, 2)," +
> > " create_time TIMESTAMP " +
> > ") WITH (" +
> > " 'connector' = 'kafka'," +
> > " 'topic' = 'order.test'," +
> > " 'properties.bootstrap.servers' = 'localhost:9092'," +
> > " 'properties.group.id' = 'testGroup'," +
> > " 'scan.startup.mode' = 'earliest-offset', " +
> > " 'format' = 'json' " +
> > ")");
> > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> >
> > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> > " id BIGINT ," +
> > " order_id STRING," +
> > " amount DECIMAL(10, 2)" +
> > " )");
> > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > bsTableEnv.executeSql("CREATE TABLE print_table WITH
> > ('connector' = 'print')" +
> > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> > ALL)");
> >
> > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming
> SELECT
> > " +
> > "id, " +
> > "order_id, " +
> > "amount " +
> > "FROM topic_products");
> >
> > Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> > table1.executeInsert("print_table");
> > }
> > }
> >
>
Re: Flink 1.11 Hive Streaming Write的问题
Posted by 李佳宸 <li...@gmail.com>.
好的,谢谢~~~
JasonLee <17...@163.com> 于2020年7月16日周四 下午8:22写道:
> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775726@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
> public static void main(String[] arg) throws Exception{
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
> String version = "3.1.2";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
> bsTableEnv.registerCatalog("myhive", hive);
> bsTableEnv.useCatalog("myhive");
> bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> bsTableEnv.executeSql("CREATE TABLE topic_products (" +
> " id BIGINT ," +
> " order_id STRING," +
> " amount DECIMAL(10, 2)," +
> " create_time TIMESTAMP " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order.test'," +
> " 'properties.bootstrap.servers' = 'localhost:9092'," +
> " 'properties.group.id' = 'testGroup'," +
> " 'scan.startup.mode' = 'earliest-offset', " +
> " 'format' = 'json' " +
> ")");
> bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
> " id BIGINT ," +
> " order_id STRING," +
> " amount DECIMAL(10, 2)" +
> " )");
> bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
> "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
> bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
> "id, " +
> "order_id, " +
> "amount " +
> "FROM topic_products");
>
> Table table1 = bsTableEnv.from("hive_sink_table_streaming");
> table1.executeInsert("print_table");
> }
> }
>
回复:Flink 1.11 Hive Streaming Write的问题
Posted by JasonLee <17...@163.com>.
hi
需要开启checkpoint
| |
JasonLee
|
|
邮箱:17610775726@163.com
|
Signature is customized by Netease Mail Master
在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。
附代码,很简短:
public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)," +
" create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json' " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)" +
" )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");
Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}