You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "liu (Jira)" <ji...@apache.org> on 2020/11/25 01:53:00 UTC

[jira] [Created] (FLINK-20330) Flink connector has error in support hive external tables (hbase or es)

liu created FLINK-20330:
---------------------------

             Summary: Flink connector has error in support hive external tables (hbase or es)
                 Key: FLINK-20330
                 URL: https://issues.apache.org/jira/browse/FLINK-20330
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Hive
    Affects Versions: 1.11.2, 1.11.1, 1.11.0
         Environment: TEST CODE LIKE THIS:


CREATE EXTERNAL TABLE hive_to_es (
 key string,
 value string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
 'es.resource' = 'hive_to_es/_doc',
 'es.index.auto.create' = 'TRUE',
 'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200'
);


insert into hive_to_es (key, value) values ('name','tom');
insert into hive_to_es (key, value) values ('yes','aaa');

select * from hive_to_es;

 

!image-2020-11-25-09-51-00-100.png|width=807,height=134!
            Reporter: liu
         Attachments: image-2020-11-25-09-42-13-102.png, image-2020-11-25-09-51-00-100.png

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop
 
input format !image-2020-11-25-09-42-13-102.png|width=384,height=288!
 
 
we add a patch like this:
 
flink-connector-hive_2.12-1.11.2.jar
org/apache/flink/connectors/hive/HiveTableSink.java +134
 
ADD PATCH:
 
{code:java}
// code placeholder

if (sd.getOutputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat");
}
if (sd.getOutputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat"); 
}
{code}
org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305
ADD PATCH:
{code:java}
// code placeholder

if (sd.getInputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat"); jobConf.set("hbase.table.name", partition.getTableProps().getProperty("hbase.table.name")); jobConf.set("hbase.columns.mapping", partition.getTableProps().getProperty("hbase.columns.mapping")); 
}

if (sd.getInputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat"); jobConf.set("location", sd.getLocation()); for (Enumeration en = partition.getTableProps().keys(); en.hasMoreElements();) { String key = en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key, partition.getTableProps().getProperty(key)); } } 
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)