You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "799590989@qq.com.INVALID" <79...@qq.com.INVALID> on 2022/03/02 07:06:00 UTC

JaninoRelMetadataProvider空指针异常

flink版本1.11.3
scala.binary版本2.11
hive版本1.1.1
hadoop版本 2.6.0-cdh5.16.2

用途是使用flink的table sql功能从给kafka创建一张表,然后将kafka的数据通过过滤后写入hive表中

下面是主要的执行逻辑,不管是提交到远程还是使用本地模式均报空指针异常,确认没有少依赖,而且在springboot的单元测试下使用不报错,困扰了好几天,在网上没找到答案
@Bean
public StreamTableEnvironment flinkTableEnv(){
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port);
    //StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration());
    return StreamTableEnvironment.create(env);
}@Resource
private StreamTableEnvironment flinkTableEnv;String catalogName = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "hive路径";
String hiveVersion = "1.1.1";
HiveCatalog hiveCatalog = new HiveCatalog(
        catalogName, defaultDatabase, hiveConfDir, hiveVersion
);
flinkTableEnv.registerCatalog(catalogName, hiveCatalog);
flinkTableEnv.useCatalog("default_catalog");
flinkTableEnv.useDatabase("default_database");
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2430_4923");flinkTableEnv.executeSql("CREATE TABLE data_2430_4923(`id` STRING,`user_id` STRING,`status` STRING) WITH ('connector' = 'kafka',
'topic' = 'person-status9',
'properties.bootstrap.servers' = '192.168.9.116:9092',
'properties.group.id' = 'chinaoly-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true'
)");
flinkTableEnv.useCatalog(catalogName);
flinkTableEnv.useDatabase(defaultDatabase);
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
flinkTableEnv.executeSql("DROP TABLE IF EXISTS myhive.default.output_2430_4926");flinkTableEnv.executeSql("CREATE TABLE myhive.tetris.output_2430_4926(id STRING,user_id STRING,status STRING) STORED AS parquet TBLPROPERTIES ('is_generic'='false')");

flinkTableEnv.useCatalog("default_catalog");
flinkTableEnv.useDatabase("default_database");
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
flinkTableEnv.executeSql("INSERT INTO myhive.tetris.output_2430_4926(id,user_id,status) SELECT id AS id, user_id AS user_id, status AS status
FROM (SELECT id AS id, user_id AS user_id, status AS status FROM (SELECT id AS id, user_id AS user_id, status AS status
FROM data_2430_4923) t4927 WHERE status = '1') t4925");

报错的SQL:
INSERT INTO myhive.tetris.output_2430_4926(id,user_id,status) SELECT id AS id, user_id AS user_id, status AS status
FROM (SELECT id AS id, user_id AS user_id, status AS status FROM (SELECT id AS id, user_id AS user_id, status AS status
FROM data_2430_4923) t4927 WHERE status = '1') t4925

在下面这个地方打断点的时候发现metadataProvider参数的值为null
org.apache.calcite.rel.metadata.RelMetadataQuery#RelMetadataQuery(org.apache.calcite.rel.metadata.JaninoRelMetadataProvider, org.apache.calcite.rel.metadata.RelMetadataQuery)







799590989@qq.com