You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kezhu Wang (Jira)" <ji...@apache.org> on 2020/12/06 05:14:00 UTC

[jira] [Commented] (FLINK-19435) jdbc JDBCOutputFormat open function invoke Class.forName(drivername)

    [ https://issues.apache.org/jira/browse/FLINK-19435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244654#comment-17244654 ] 

Kezhu Wang commented on FLINK-19435:
------------------------------------

Seems that this problem still exist. I have pushed [a branch|https://github.com/kezhuw/flink/commits/flink-19435-sql-driver-class-loading-deadlock-test-case] with [hang test case|https://github.com/kezhuw/flink/commit/a732d30093073e1e5bcdbed13290cef3f9a00d69#diff-0dafd910621d60887021215d320c2a9fc5b12305825c6776de8a307483920a52R110] in my repository to demonstrate this. One could use command {{mvn -Dcheckstyle.skip=true -Dtest=JdbcDriverClassConcurrentLoadingTest -DfailIfNoTests=false -pl "flink-connectors/flink-connector-jdbc" -am test -Pskip-webui-build}} or IDE to reproduce this hang. Please rerun if it does not, it is near 100% in my local environment. After hang, you will see similar threads in thread stack dumps:
{code:java}
"Sink: Postgres Jdbc Sink (1/1)#0" #102 prio=5 os_prio=31 tid=0x00007f8fae050800 nid=0xd703 in Object.wait() [0x000070000fb6c000]
   java.lang.Thread.State: RUNNABLE
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at java.sql.DriverManager$2.run(DriverManager.java:603)
        at java.sql.DriverManager$2.run(DriverManager.java:583)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.sql.DriverManager.loadInitialDrivers(DriverManager.java:583)
        at java.sql.DriverManager.<clinit>(DriverManager.java:101)
        at org.postgresql.Driver.register(Driver.java:721)
        at org.postgresql.Driver.<clinit>(Driver.java:73)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
        - locked <0x000000074cf7c3e8> (a org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider)
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
        at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:114)
        at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$641/1400119541.run(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)

"Sink: MySQL Jdbc Sink (1/1)#0" #101 prio=5 os_prio=31 tid=0x00007f8fae37b000 nid=0x12603 in Object.wait() [0x000070000fa6a000]
   java.lang.Thread.State: RUNNABLE
        at com.mysql.cj.jdbc.Driver.<clinit>(Driver.java:55)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
        - locked <0x000000074d0a5368> (a org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider)
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
        at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
        at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:114)
        at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:50)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$641/1400119541.run(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
{code}

Two threads are deadlock due to [wait on each other's initializing static block in JVM|https://github.com/openjdk/jdk/blob/jdk8-b120/hotspot/src/share/vm/oops/instanceKlass.cpp#L794]. Restricted to above dump, thread {{Sink: Postgres Jdbc Sink (1/1)#0}} is waiting on {{com.mysql.cj.jdbc.Driver}}'s static block in {{java.sql.DriverManager}}'s static block, thread {{Sink: MySQL Jdbc Sink (1/1)#0}} is waiting on {{java.sql.DriverManager}}'s static block in {{com.mysql.cj.jdbc.Driver}}'s static block.

To trigger this deadlock, one could load different driver classes concurrently using {{Class.forName}} before {{java.sql.DriverManager}} is loaded. Here is [a relatively detailed article|https://medium.com/@priyaaggarwal24/avoiding-deadlock-when-using-multiple-jdbc-drivers-in-an-application-ce0b9464ecdf] on this problem, there are should be more on internet. This is a JDK 8 specific problem, it has been solved in jdk 9 and above with [JDK-8060068|https://github.com/openjdk/jdk/commit/7e500d532492437658e6736d6279809f3fa40406] and [JDK-8067904|https://github.com/openjdk/jdk/commit/ddcbf6138900d2f3a6c550ceea5a01e096819323] by moving driver loading out of {{java.sql.DriverManager}}'s static block.

There are probably several ways to circumvent this, such as ensuring {{java.sql.DriverManager}} is loaded before {{Class.forName}} and locking on a global enough object before {{Class.forName}}. But I think a more elegant way is not using {{Class.forName}} for driver class loading. {{DriverManager.getConnection}} will do this automatically for us. It is value of {{java.sql.Driver}} and {{java.util.ServiceLoader}}: care about promise not implementation. I think we should drop required field {{driverName}} from {{JdbcConnectionOptions}} in long term as it force clients care about driver class in addition to driver jar.

[~jark] [~twalthr] [~roman_khachatryan] [~Leonard Xu] What do you think ?

> jdbc JDBCOutputFormat open function invoke Class.forName(drivername)
> --------------------------------------------------------------------
>
>                 Key: FLINK-19435
>                 URL: https://issues.apache.org/jira/browse/FLINK-19435
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / JDBC
>    Affects Versions: 1.10.2
>            Reporter: xiaodao
>            Priority: Major
>             Fix For: 1.10.3
>
>         Attachments: image-2020-10-09-20-48-48-261.png, image-2020-10-09-20-49-23-644.png
>
>
> when we sink data to multi jdbc outputformat , 
> ```
> protected void establishConnection() throws SQLException, ClassNotFoundException {
>  Class.forName(drivername);
>  if (username == null) {
>  connection = DriverManager.getConnection(dbURL);
>  } else {
>  connection = DriverManager.getConnection(dbURL, username, password);
>  }
> }
> ```
> may cause jdbc driver deadlock. it need to change to synchronized function.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)