You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 顾斌杰 <bi...@paat.com> on 2022/03/15 06:13:56 UTC
Flink写入Hive错误
Flink版本:1.13.3
Hive版本:2.1.1
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream userBehaviorDataStream = source
.map(new UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull);
userBehaviorDataStream.print();
SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new ProcessFunction() {
private static final long serialVersionUID = 6365847542902145255L;
@Override
public void processElement(UserBehavior value, Context ctx, Collector out) throws Exception {
Row row = new Row(2);
row.setField(0, value.getAppName());
row.setField(1, value.getAppName());
out.collect(row);
}
});
HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) null, "2.1.1");
tableEnv.registerCatalog("devHive", hiveCatalog);
tableEnv.useCatalog("devHive");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.useDatabase("default");
tableEnv.createTemporaryView("sourceTable", stepFr);
String sql = "insert into zyz select * from sourceTable";
tableEnv.executeSql(sql);
但是他老是报错,我想请问是否我写错了什么?
2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1991] - class java.util.LinkedHashMap does not contain a getter for field accessOrder
2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1994] - class java.util.LinkedHashMap does not contain a setter for field accessOrder
2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2037] - Class class java.util.LinkedHashMap cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2093] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-03-15 14:11:39.448 [main] INFO [HiveCatalog][createHiveConf][257] - Setting hive conf dir as null
2022-03-15 14:11:39.449 [main] INFO [HiveCatalog][createHiveConf][278] - Found hive-site.xml in classpath: file:/D:/JetBrains/IdeaProject/paat_realtime_deal/target/classes/hive-site.xml
2022-03-15 14:11:39.491 [main] INFO [HiveCatalog][][219] - Created HiveCatalog 'devHive'
2022-03-15 14:11:40.063 [main] INFO [HiveCatalog][open][299] - Connected to Hive metastore
2022-03-15 14:11:40.161 [main] INFO [CatalogManager][setCurrentCatalog][262] - Set the current default catalog as [devHive] and the current default database as [default].
2022-03-15 14:11:41.158 [main] INFO [HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical plan
2022-03-15 14:11:41.164 [main] INFO [HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 of Semantic Analysis
2022-03-15 14:11:41.164 [main] INFO [HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source tables
2022-03-15 14:11:41.178 [main] ERROR [HiveParserSemanticAnalyzer][getMetaData][1489] - org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 'sourceTable'
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547)
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487)
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283)
at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255)
at org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290)
at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58)
at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53)
at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65)
at com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16)
Exception in thread "main" org.apache.flink.table.api.ValidationException: HiveParser failed to parse insert into zyz select * from sourceTable
at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:253)
at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58)
at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53)
at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65)
at com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16)
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 'sourceTable'
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547)
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487)
at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283)
at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255)
at org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290)
at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238)
... 6 more
Re:Flink写入Hive错误
Posted by Mang Zhang <zh...@163.com>.
在Flink里面,你如果 use 了 HiveCatalog,那么暂时不能很好的使用非hive connector以外的表;
我理解你现在想要做的是,将flink 表的数据写入到一个hive table里
HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) null, "2.1.1");
tableEnv.registerCatalog("devHive", hiveCatalog);
// 去掉这部分,还使用flink默认的catalog
//tableEnv.useCatalog("devHive");
//tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//tableEnv.useDatabase("default");
tableEnv.createTemporaryView("sourceTable", stepFr);
// sql 改写一下, ${catalog name}.${db name}.${table name}
String sql = "insert into devHive.default.zyz select * from sourceTable";
tableEnv.executeSql(sql);
可以试试这样写
--
Best regards,
Mang Zhang
At 2022-03-15 14:13:56, "顾斌杰" <bi...@paat.com> wrote:
>
>Flink版本:1.13.3
>Hive版本:2.1.1
>
>
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> DataStream userBehaviorDataStream = source
> .map(new UserBehaviorAddEventTimeTransform()).filter(Objects::nonNull);
> userBehaviorDataStream.print();
> SingleOutputStreamOperator stepFr = userBehaviorDataStream.process(new ProcessFunction() {
> private static final long serialVersionUID = 6365847542902145255L;
>
> @Override
> public void processElement(UserBehavior value, Context ctx, Collector out) throws Exception {
> Row row = new Row(2);
> row.setField(0, value.getAppName());
> row.setField(1, value.getAppName());
> out.collect(row);
> }
> });
>
> HiveCatalog hiveCatalog = new HiveCatalog("devHive", null, (String) null, "2.1.1");
> tableEnv.registerCatalog("devHive", hiveCatalog);
> tableEnv.useCatalog("devHive");
> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tableEnv.useDatabase("default");
> tableEnv.createTemporaryView("sourceTable", stepFr);
>
> String sql = "insert into zyz select * from sourceTable";
> tableEnv.executeSql(sql);
>
>
>但是他老是报错,我想请问是否我写错了什么?
>2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1991] - class java.util.LinkedHashMap does not contain a getter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][isValidPojoField][1994] - class java.util.LinkedHashMap does not contain a setter for field accessOrder
>2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2037] - Class class java.util.LinkedHashMap cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
>2022-03-15 14:11:39.242 [main] INFO [TypeExtractor][analyzePojo][2093] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
>2022-03-15 14:11:39.448 [main] INFO [HiveCatalog][createHiveConf][257] - Setting hive conf dir as null
>2022-03-15 14:11:39.449 [main] INFO [HiveCatalog][createHiveConf][278] - Found hive-site.xml in classpath: file:/D:/JetBrains/IdeaProject/paat_realtime_deal/target/classes/hive-site.xml
>2022-03-15 14:11:39.491 [main] INFO [HiveCatalog][][219] - Created HiveCatalog 'devHive'
>2022-03-15 14:11:40.063 [main] INFO [HiveCatalog][open][299] - Connected to Hive metastore
>2022-03-15 14:11:40.161 [main] INFO [CatalogManager][setCurrentCatalog][262] - Set the current default catalog as [devHive] and the current default database as [default].
>2022-03-15 14:11:41.158 [main] INFO [HiveParserCalcitePlanner][genLogicalPlan][251] - Starting generating logical plan
>2022-03-15 14:11:41.164 [main] INFO [HiveParserSemanticAnalyzer][genResolvedParseTree][2279] - Completed phase 1 of Semantic Analysis
>2022-03-15 14:11:41.164 [main] INFO [HiveParserSemanticAnalyzer][getMetaData][1508] - Get metadata for source tables
>2022-03-15 14:11:41.178 [main] ERROR [HiveParserSemanticAnalyzer][getMetaData][1489] - org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 'sourceTable'
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547)
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487)
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283)
> at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58)
> at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53)
> at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65)
> at com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16)
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: HiveParser failed to parse insert into zyz select * from sourceTable
> at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:253)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:208)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at com.paat.realtime.task.core.SyncUserBehaviorToMysqlTask.transform(SyncUserBehaviorToMysqlTask.java:58)
> at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:53)
> at com.paat.realtime.core.DataStreamApplicationContext.execute(DataStreamApplicationContext.java:65)
> at com.paat.realtime.application.SyncUserBehaviorToMysqlApplication.main(SyncUserBehaviorToMysqlApplication.java:16)
>Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:30 Table not found 'sourceTable'
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1547)
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.getMetaData(HiveParserSemanticAnalyzer.java:1487)
> at org.apache.flink.table.planner.delegation.hive.copy.HiveParserSemanticAnalyzer.genResolvedParseTree(HiveParserSemanticAnalyzer.java:2283)
> at org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner.genLogicalPlan(HiveParserCalcitePlanner.java:255)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.analyzeSql(HiveParser.java:290)
> at org.apache.flink.table.planner.delegation.hive.HiveParser.processCmd(HiveParser.java:238)
> ... 6 more
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>