You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "liu (Jira)" <ji...@apache.org> on 2020/11/25 01:53:00 UTC
[jira] [Created] (FLINK-20330) Flink connector has error in support
hive external tables (hbase or es)
liu created FLINK-20330:
---------------------------
Summary: Flink connector has error in support hive external tables (hbase or es)
Key: FLINK-20330
URL: https://issues.apache.org/jira/browse/FLINK-20330
Project: Flink
Issue Type: Bug
Components: Connectors / Hive
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Environment: TEST CODE LIKE THIS:
CREATE EXTERNAL TABLE hive_to_es (
key string,
value string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'hive_to_es/_doc',
'es.index.auto.create' = 'TRUE',
'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200'
);
insert into hive_to_es (key, value) values ('name','tom');
insert into hive_to_es (key, value) values ('yes','aaa');
select * from hive_to_es;
!image-2020-11-25-09-51-00-100.png|width=807,height=134!
Reporter: liu
Attachments: image-2020-11-25-09-42-13-102.png, image-2020-11-25-09-51-00-100.png
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop
input format !image-2020-11-25-09-42-13-102.png|width=384,height=288!
we add a patch like this:
flink-connector-hive_2.12-1.11.2.jar
org/apache/flink/connectors/hive/HiveTableSink.java +134
ADD PATCH:
{code:java}
// code placeholder
if (sd.getOutputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat");
}
if (sd.getOutputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat");
}
{code}
org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305
ADD PATCH:
{code:java}
// code placeholder
if (sd.getInputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat"); jobConf.set("hbase.table.name", partition.getTableProps().getProperty("hbase.table.name")); jobConf.set("hbase.columns.mapping", partition.getTableProps().getProperty("hbase.columns.mapping"));
}
if (sd.getInputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat"); jobConf.set("location", sd.getLocation()); for (Enumeration en = partition.getTableProps().keys(); en.hasMoreElements();) { String key = en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key, partition.getTableProps().getProperty(key)); } }
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)