You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Shengkai Fang (Jira)" <ji...@apache.org> on 2020/05/28 12:58:00 UTC

[jira] [Created] (FLINK-18006) It will throw Invalid lambda deserialization Exception when writing to elastic search with new format

Shengkai Fang created FLINK-18006:
-------------------------------------

             Summary: It will throw Invalid lambda deserialization Exception when writing to elastic search with new format
                 Key: FLINK-18006
                 URL: https://issues.apache.org/jira/browse/FLINK-18006
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch, Table SQL / Client
    Affects Versions: 1.11.0
         Environment: ElasticSearch version is 7.6.0
            Reporter: Shengkai Fang


My job follows:
{code:java}
// 
create table csv( pageId VARCHAR, eventId VARCHAR, recvTime VARCHAR) with ( 'connector' = 'filesystem',
 'path' = '/Users/ohmeatball/Work/flink-sql-etl/data-generator/src/main/resources/user3.csv',
 'format' = 'csv'
 )
-----------------------------------------
CREATE TABLE es_table (
  aggId varchar ,
  pageId varchar ,
  ts varchar ,
  expoCnt int ,
  clkCnt int
) WITH (
'connector' = 'elasticsearch',
'hosts' = 'http://localhost:9200',
'index' = 'cli_test',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.interval' = '1000',
'format' = 'json'
)
-----------------------------------------
INSERT INTO es_table
SELECT  pageId,eventId,cast(recvTime as varchar) as ts, 1, 1 from csv;
{code}
The full exception follows:
{code:java}
Sink(table=[default_catalog.default_database.es_table], fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@bb5ab41.Sink(table=[default_catalog.default_database.es_table], fields=[aggId, pageId, ts, expoCnt, clkCnt]) (1/1) (b51209fac96948c20e85b8df137287d3) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@bb5ab41.org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:518) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:545) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_151]Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
Caused by: java.lang.reflect.InvocationTargetExceptionCaused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 moreCaused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:80) ~[flink-sql-connector-elasticsearch7_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_151] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_151] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_151] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2073) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) ~[?:1.8.0_151] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) ~[?:1.8.0_151] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) ~[?:1.8.0_151]
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) ~[?:1.8.0_151] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] ... 10 more
{code}
Notice: everything works fine with former connector grammer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)