You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/27 14:04:55 UTC

[incubator-skywalking] branch master updated: Support ConnectionProxyInstance tracing (#1961)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b868f8  Support ConnectionProxyInstance tracing (#1961)
4b868f8 is described below

commit 4b868f8aeb289f2b0345546cf0f2446442da135d
Author: Xin,Zhang <zh...@apache.org>
AuthorDate: Tue Nov 27 22:04:50 2018 +0800

    Support ConnectionProxyInstance tracing (#1961)
---
 ...eateJdbcConnectionProxyInstanceInterceptor.java |  54 ++
 ...BalancedConnectionProxyInstanceInterceptor.java |  53 ++
 ...licationConnectionProxyInstanceInterceptor.java |  53 ++
 .../FailoverConnectionProxyInstrumentation.java    |  57 ++
 ...LoadBalancedConnectionProxyInstrumentation.java |  55 ++
 .../ReplicationConnectionProxyInstrumentation.java |  55 ++
 .../mysql/wrapper/CallableStatementWrapper.java    | 539 +++++++++++++++++
 .../jdbc/mysql/wrapper/JdbcConnectionWrapper.java  | 652 +++++++++++++++++++++
 .../wrapper/LoadBalancedConnectionWrapper.java     |  48 ++
 .../mysql/wrapper/PreparedStatementWrapper.java    | 303 ++++++++++
 .../wrapper/ReplicationConnectionWrapper.java      |  80 +++
 .../jdbc/mysql/wrapper/StatementWrapper.java       | 307 ++++++++++
 .../plugin/jdbc/mysql/wrapper/TracingUtils.java    |  53 ++
 .../src/main/resources/skywalking-plugin.def       |   3 +
 14 files changed, 2312 insertions(+)

diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java
new file mode 100644
index 0000000..270740d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateJdbcConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.JdbcConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateJdbcConnectionProxyInstanceInterceptor implements StaticMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+    }
+
+    @Override
+    public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), connectionUrl.getDatabase());
+        return new JdbcConnectionWrapper((JdbcConnection)ret, connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java
new file mode 100644
index 0000000..e205610
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateLoadBalancedConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.ha.LoadBalancedConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.LoadBalancedConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateLoadBalancedConnectionProxyInstanceInterceptor implements StaticMethodsAroundInterceptor {
+    @Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+
+    }
+
+    @Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), connectionUrl.getDatabase());
+        return new LoadBalancedConnectionWrapper((LoadBalancedConnection)ret, connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java
new file mode 100644
index 0000000..d76edf3
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/CreateReplicationConnectionProxyInstanceInterceptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql;
+
+import com.mysql.cj.api.jdbc.ha.ReplicationConnection;
+import com.mysql.cj.core.conf.url.ConnectionUrl;
+import com.mysql.cj.core.conf.url.HostInfo;
+import java.lang.reflect.Method;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper.ReplicationConnectionWrapper;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CreateReplicationConnectionProxyInstanceInterceptor implements StaticMethodsAroundInterceptor {
+    @Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        MethodInterceptResult result) {
+
+    }
+
+    @Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Object ret) {
+        ConnectionUrl connectionUrl = (ConnectionUrl)allArguments[0];
+
+        StringBuilder hosts = new StringBuilder();
+        for (HostInfo info : connectionUrl.getHostsList()) {
+            hosts.append(info.getHost()).append(":").append(info.getPort()).append(",");
+        }
+        ConnectionInfo connectionInfo = new ConnectionInfo(ComponentsDefine.MYSQL_JDBC_DRIVER, "Mysql", hosts.toString(), connectionUrl.getDatabase());
+        return new ReplicationConnectionWrapper((ReplicationConnection)ret, connectionInfo);
+    }
+
+    @Override
+    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
+        Throwable t) {
+
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java
new file mode 100644
index 0000000..e71897c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/FailoverConnectionProxyInstrumentation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class FailoverConnectionProxyInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdbc.mysql.CreateJdbcConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = "com.mysql.cj.jdbc.ha.FailoverConnectionProxy";
+
+    @Override
+    protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java
new file mode 100644
index 0000000..a423f02
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/LoadBalancedConnectionProxyInstrumentation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class LoadBalancedConnectionProxyInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdbc.mysql.CreateLoadBalancedConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = "com.mysql.cj.jdbc.ha.LoadBalancedConnectionProxy";
+
+    @Override protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java
new file mode 100644
index 0000000..fa96748
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/define/ReplicationConnectionProxyInstrumentation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class ReplicationConnectionProxyInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
+
+    public static final String METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.jdbc.mysql.CreateReplicationConnectionProxyInstanceInterceptor";
+    public static final String INTERCEPT_CLASS = "com.mysql.cj.jdbc.ha.ReplicationConnectionProxy";
+
+    @Override protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
+        return new StaticMethodsInterceptPoint[] {
+            new StaticMethodsInterceptPoint() {
+                @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named("createProxyInstance");
+                }
+
+                @Override public String getMethodsInterceptor() {
+                    return METHOD_INTERCEPTOR;
+                }
+
+                @Override public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+
+    @Override protected ClassMatch enhanceClass() {
+        return byName(INTERCEPT_CLASS);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java
new file mode 100644
index 0000000..e76ac5b
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/CallableStatementWrapper.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Map;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class CallableStatementWrapper extends PreparedStatementWrapper implements CallableStatement {
+
+    @Override public void registerOutParameter(int parameterIndex, int sqlType) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, int sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, scale);
+    }
+
+    @Override public boolean wasNull() throws SQLException {
+        return call.wasNull();
+    }
+
+    @Override public String getString(int parameterIndex) throws SQLException {
+        return call.getString(parameterIndex);
+    }
+
+    @Override public boolean getBoolean(int parameterIndex) throws SQLException {
+        return call.getBoolean(parameterIndex);
+    }
+
+    @Override public byte getByte(int parameterIndex) throws SQLException {
+        return call.getByte(parameterIndex);
+    }
+
+    @Override public short getShort(int parameterIndex) throws SQLException {
+        return call.getShort(parameterIndex);
+    }
+
+    @Override public int getInt(int parameterIndex) throws SQLException {
+        return call.getInt(parameterIndex);
+    }
+
+    @Override public long getLong(int parameterIndex) throws SQLException {
+        return call.getLong(parameterIndex);
+    }
+
+    @Override public float getFloat(int parameterIndex) throws SQLException {
+        return call.getFloat(parameterIndex);
+    }
+
+    @Override public double getDouble(int parameterIndex) throws SQLException {
+        return call.getDouble(parameterIndex);
+    }
+
+    @Override @Deprecated public BigDecimal getBigDecimal(int parameterIndex, int scale) throws SQLException {
+        return call.getBigDecimal(parameterIndex, scale);
+    }
+
+    @Override public byte[] getBytes(int parameterIndex) throws SQLException {
+        return call.getBytes(parameterIndex);
+    }
+
+    @Override public Date getDate(int parameterIndex) throws SQLException {
+        return call.getDate(parameterIndex);
+    }
+
+    @Override public Time getTime(int parameterIndex) throws SQLException {
+        return call.getTime(parameterIndex);
+    }
+
+    @Override public Timestamp getTimestamp(int parameterIndex) throws SQLException {
+        return call.getTimestamp(parameterIndex);
+    }
+
+    @Override public Object getObject(int parameterIndex) throws SQLException {
+        return call.getObject(parameterIndex);
+    }
+
+    @Override public BigDecimal getBigDecimal(int parameterIndex) throws SQLException {
+        return call.getBigDecimal(parameterIndex);
+    }
+
+    @Override public Object getObject(int parameterIndex, Map<String, Class<?>> map) throws SQLException {
+        return call.getObject(parameterIndex, map);
+    }
+
+    @Override public Ref getRef(int parameterIndex) throws SQLException {
+        return call.getRef(parameterIndex);
+    }
+
+    @Override public Blob getBlob(int parameterIndex) throws SQLException {
+        return call.getBlob(parameterIndex);
+    }
+
+    @Override public Clob getClob(int parameterIndex) throws SQLException {
+        return call.getClob(parameterIndex);
+    }
+
+    @Override public Array getArray(int parameterIndex) throws SQLException {
+        return call.getArray(parameterIndex);
+    }
+
+    @Override public Date getDate(int parameterIndex, Calendar cal) throws SQLException {
+        return call.getDate(parameterIndex, cal);
+    }
+
+    @Override public Time getTime(int parameterIndex, Calendar cal) throws SQLException {
+        return call.getTime(parameterIndex, cal);
+    }
+
+    @Override public Timestamp getTimestamp(int parameterIndex, Calendar cal) throws SQLException {
+        return call.getTimestamp(parameterIndex, cal);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, int sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int sqlType) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, scale);
+    }
+
+    @Override public void registerOutParameter(String parameterName, int sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, typeName);
+    }
+
+    @Override public URL getURL(int parameterIndex) throws SQLException {
+        return call.getURL(parameterIndex);
+    }
+
+    @Override public void setURL(String parameterName, URL val) throws SQLException {
+        call.setURL(parameterName, val);
+    }
+
+    @Override public void setNull(String parameterName, int sqlType) throws SQLException {
+        call.setNull(parameterName, sqlType);
+    }
+
+    @Override public void setBoolean(String parameterName, boolean x) throws SQLException {
+        call.setBoolean(parameterName, x);
+    }
+
+    @Override public void setByte(String parameterName, byte x) throws SQLException {
+        call.setByte(parameterName, x);
+    }
+
+    @Override public void setShort(String parameterName, short x) throws SQLException {
+        call.setShort(parameterName, x);
+    }
+
+    @Override public void setInt(String parameterName, int x) throws SQLException {
+        call.setInt(parameterName, x);
+    }
+
+    @Override public void setLong(String parameterName, long x) throws SQLException {
+        call.setLong(parameterName, x);
+    }
+
+    @Override public void setFloat(String parameterName, float x) throws SQLException {
+        call.setFloat(parameterName, x);
+    }
+
+    @Override public void setDouble(String parameterName, double x) throws SQLException {
+        call.setDouble(parameterName, x);
+    }
+
+    @Override public void setBigDecimal(String parameterName, BigDecimal x) throws SQLException {
+        call.setBigDecimal(parameterName, x);
+    }
+
+    @Override public void setString(String parameterName, String x) throws SQLException {
+        call.setString(parameterName, x);
+    }
+
+    @Override public void setBytes(String parameterName, byte[] x) throws SQLException {
+        call.setBytes(parameterName, x);
+    }
+
+    @Override public void setDate(String parameterName, Date x) throws SQLException {
+        call.setDate(parameterName, x);
+    }
+
+    @Override public void setTime(String parameterName, Time x) throws SQLException {
+        call.setTime(parameterName, x);
+    }
+
+    @Override public void setTimestamp(String parameterName, Timestamp x) throws SQLException {
+        call.setTimestamp(parameterName, x);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x, int length) throws SQLException {
+        call.setAsciiStream(parameterName, x, length);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x, int length) throws SQLException {
+        call.setBinaryStream(parameterName, x, length);
+    }
+
+    @Override public void setObject(String parameterName, Object x, int targetSqlType, int scale) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType, scale);
+    }
+
+    @Override public void setObject(String parameterName, Object x, int targetSqlType) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType);
+    }
+
+    @Override public void setObject(String parameterName, Object x) throws SQLException {
+        call.setObject(parameterName, x);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader reader, int length) throws SQLException {
+        call.setCharacterStream(parameterName, reader, length);
+    }
+
+    @Override public void setDate(String parameterName, Date x, Calendar cal) throws SQLException {
+        call.setDate(parameterName, x, cal);
+    }
+
+    @Override public void setTime(String parameterName, Time x, Calendar cal) throws SQLException {
+        call.setTime(parameterName, x, cal);
+    }
+
+    @Override public void setTimestamp(String parameterName, Timestamp x, Calendar cal) throws SQLException {
+        call.setTimestamp(parameterName, x, cal);
+    }
+
+    @Override public void setNull(String parameterName, int sqlType, String typeName) throws SQLException {
+        call.setNull(parameterName, sqlType, typeName);
+    }
+
+    @Override public String getString(String parameterName) throws SQLException {
+        return call.getString(parameterName);
+    }
+
+    @Override public boolean getBoolean(String parameterName) throws SQLException {
+        return call.getBoolean(parameterName);
+    }
+
+    @Override public byte getByte(String parameterName) throws SQLException {
+        return call.getByte(parameterName);
+    }
+
+    @Override public short getShort(String parameterName) throws SQLException {
+        return call.getShort(parameterName);
+    }
+
+    @Override public int getInt(String parameterName) throws SQLException {
+        return call.getInt(parameterName);
+    }
+
+    @Override public long getLong(String parameterName) throws SQLException {
+        return call.getLong(parameterName);
+    }
+
+    @Override public float getFloat(String parameterName) throws SQLException {
+        return call.getFloat(parameterName);
+    }
+
+    @Override public double getDouble(String parameterName) throws SQLException {
+        return call.getDouble(parameterName);
+    }
+
+    @Override public byte[] getBytes(String parameterName) throws SQLException {
+        return call.getBytes(parameterName);
+    }
+
+    @Override public Date getDate(String parameterName) throws SQLException {
+        return call.getDate(parameterName);
+    }
+
+    @Override public Time getTime(String parameterName) throws SQLException {
+        return call.getTime(parameterName);
+    }
+
+    @Override public Timestamp getTimestamp(String parameterName) throws SQLException {
+        return call.getTimestamp(parameterName);
+    }
+
+    @Override public Object getObject(String parameterName) throws SQLException {
+        return call.getObject(parameterName);
+    }
+
+    @Override public BigDecimal getBigDecimal(String parameterName) throws SQLException {
+        return call.getBigDecimal(parameterName);
+    }
+
+    @Override public Object getObject(String parameterName, Map<String, Class<?>> map) throws SQLException {
+        return call.getObject(parameterName, map);
+    }
+
+    @Override public Ref getRef(String parameterName) throws SQLException {
+        return call.getRef(parameterName);
+    }
+
+    @Override public Blob getBlob(String parameterName) throws SQLException {
+        return call.getBlob(parameterName);
+    }
+
+    @Override public Clob getClob(String parameterName) throws SQLException {
+        return call.getClob(parameterName);
+    }
+
+    @Override public Array getArray(String parameterName) throws SQLException {
+        return call.getArray(parameterName);
+    }
+
+    @Override public Date getDate(String parameterName, Calendar cal) throws SQLException {
+        return call.getDate(parameterName, cal);
+    }
+
+    @Override public Time getTime(String parameterName, Calendar cal) throws SQLException {
+        return call.getTime(parameterName, cal);
+    }
+
+    @Override public Timestamp getTimestamp(String parameterName, Calendar cal) throws SQLException {
+        return call.getTimestamp(parameterName, cal);
+    }
+
+    @Override public URL getURL(String parameterName) throws SQLException {
+        return call.getURL(parameterName);
+    }
+
+    @Override public RowId getRowId(int parameterIndex) throws SQLException {
+        return call.getRowId(parameterIndex);
+    }
+
+    @Override public RowId getRowId(String parameterName) throws SQLException {
+        return call.getRowId(parameterName);
+    }
+
+    @Override public void setRowId(String parameterName, RowId x) throws SQLException {
+        call.setRowId(parameterName, x);
+    }
+
+    @Override public void setNString(String parameterName, String value) throws SQLException {
+        call.setNString(parameterName, value);
+    }
+
+    @Override public void setNCharacterStream(String parameterName, Reader value, long length) throws SQLException {
+        call.setNCharacterStream(parameterName, value, length);
+    }
+
+    @Override public void setNClob(String parameterName, NClob value) throws SQLException {
+        call.setNClob(parameterName, value);
+    }
+
+    @Override public void setClob(String parameterName, Reader reader, long length) throws SQLException {
+        call.setClob(parameterName, reader, length);
+    }
+
+    @Override public void setBlob(String parameterName, InputStream inputStream, long length) throws SQLException {
+        call.setBlob(parameterName, inputStream, length);
+    }
+
+    @Override public void setNClob(String parameterName, Reader reader, long length) throws SQLException {
+        call.setNClob(parameterName, reader, length);
+    }
+
+    @Override public NClob getNClob(int parameterIndex) throws SQLException {
+        return call.getNClob(parameterIndex);
+    }
+
+    @Override public NClob getNClob(String parameterName) throws SQLException {
+        return call.getNClob(parameterName);
+    }
+
+    @Override public void setSQLXML(String parameterName, SQLXML xmlObject) throws SQLException {
+        call.setSQLXML(parameterName, xmlObject);
+    }
+
+    @Override public SQLXML getSQLXML(int parameterIndex) throws SQLException {
+        return call.getSQLXML(parameterIndex);
+    }
+
+    @Override public SQLXML getSQLXML(String parameterName) throws SQLException {
+        return call.getSQLXML(parameterName);
+    }
+
+    @Override public String getNString(int parameterIndex) throws SQLException {
+        return call.getNString(parameterIndex);
+    }
+
+    @Override public String getNString(String parameterName) throws SQLException {
+        return call.getNString(parameterName);
+    }
+
+    @Override public Reader getNCharacterStream(int parameterIndex) throws SQLException {
+        return call.getNCharacterStream(parameterIndex);
+    }
+
+    @Override public Reader getNCharacterStream(String parameterName) throws SQLException {
+        return call.getNCharacterStream(parameterName);
+    }
+
+    @Override public Reader getCharacterStream(int parameterIndex) throws SQLException {
+        return call.getCharacterStream(parameterIndex);
+    }
+
+    @Override public Reader getCharacterStream(String parameterName) throws SQLException {
+        return call.getCharacterStream(parameterName);
+    }
+
+    @Override public void setBlob(String parameterName, Blob x) throws SQLException {
+        call.setBlob(parameterName, x);
+    }
+
+    @Override public void setClob(String parameterName, Clob x) throws SQLException {
+        call.setClob(parameterName, x);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x, long length) throws SQLException {
+        call.setAsciiStream(parameterName, x, length);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x, long length) throws SQLException {
+        call.setBinaryStream(parameterName, x, length);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader reader, long length) throws SQLException {
+        call.setCharacterStream(parameterName, reader, length);
+    }
+
+    @Override public void setAsciiStream(String parameterName, InputStream x) throws SQLException {
+        call.setAsciiStream(parameterName, x);
+    }
+
+    @Override public void setBinaryStream(String parameterName, InputStream x) throws SQLException {
+        call.setBinaryStream(parameterName, x);
+    }
+
+    @Override public void setCharacterStream(String parameterName, Reader reader) throws SQLException {
+        call.setCharacterStream(parameterName, reader);
+    }
+
+    @Override public void setNCharacterStream(String parameterName, Reader value) throws SQLException {
+        call.setNCharacterStream(parameterName, value);
+    }
+
+    @Override public void setClob(String parameterName, Reader reader) throws SQLException {
+        call.setClob(parameterName, reader);
+    }
+
+    @Override public void setBlob(String parameterName, InputStream inputStream) throws SQLException {
+        call.setBlob(parameterName, inputStream);
+    }
+
+    @Override public void setNClob(String parameterName, Reader reader) throws SQLException {
+        call.setNClob(parameterName, reader);
+    }
+
+    @Override public <T> T getObject(int parameterIndex, Class<T> type) throws SQLException {
+        return call.getObject(parameterIndex, type);
+    }
+
+    @Override public <T> T getObject(String parameterName, Class<T> type) throws SQLException {
+        return call.getObject(parameterName, type);
+    }
+
+    @Override public void setObject(String parameterName, Object x, SQLType targetSqlType,
+        int scaleOrLength) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override public void setObject(String parameterName, Object x, SQLType targetSqlType) throws SQLException {
+        call.setObject(parameterName, x, targetSqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, SQLType sqlType) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType);
+    }
+
+    @Override public void registerOutParameter(int parameterIndex, SQLType sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, scale);
+    }
+
+    @Override
+    public void registerOutParameter(int parameterIndex, SQLType sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void registerOutParameter(String parameterName, SQLType sqlType) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType);
+    }
+
+    @Override public void registerOutParameter(String parameterName, SQLType sqlType, int scale) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, scale);
+    }
+
+    @Override
+    public void registerOutParameter(String parameterName, SQLType sqlType, String typeName) throws SQLException {
+        call.registerOutParameter(parameterName, sqlType, typeName);
+    }
+
+    private final CallableStatement call;
+    private final String sql;
+
+    public CallableStatementWrapper(CallableStatement call, ConnectionInfo connectionInfo, String sql) {
+        super(call, connectionInfo, sql, "Callable");
+        this.call = call;
+        this.sql = sql;
+    }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java
new file mode 100644
index 0000000..5b2ab64
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/JdbcConnectionWrapper.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.exceptions.ExceptionInterceptor;
+import com.mysql.cj.api.jdbc.ClientInfoProvider;
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.api.jdbc.JdbcPropertySet;
+import com.mysql.cj.api.jdbc.interceptors.StatementInterceptor;
+import com.mysql.cj.api.jdbc.result.ResultSetInternalMethods;
+import com.mysql.cj.api.mysqla.io.PacketPayload;
+import com.mysql.cj.api.mysqla.result.ColumnDefinition;
+import com.mysql.cj.core.ServerVersion;
+import com.mysql.cj.jdbc.ServerPreparedStatement;
+import com.mysql.cj.jdbc.StatementImpl;
+import com.mysql.cj.jdbc.result.CachedResultSetMetaData;
+import com.mysql.cj.mysqla.MysqlaSession;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.concurrent.Executor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class JdbcConnectionWrapper implements JdbcConnection, EnhancedInstance {
+
+    public JdbcConnectionWrapper(JdbcConnection delegate, ConnectionInfo connectionInfo) {
+        this.delegate = delegate;
+        this.connectionInfo = connectionInfo;
+    }
+
+    public JdbcPropertySet getPropertySet() {
+        return delegate.getPropertySet();
+    }
+
+    public MysqlaSession getSession() {
+        return delegate.getSession();
+    }
+
+    public void changeUser(String s, String s1) throws SQLException {
+        delegate.changeUser(s, s1);
+    }
+
+    @Deprecated public void clearHasTriedMaster() {
+        delegate.clearHasTriedMaster();
+    }
+
+    public PreparedStatement clientPrepareStatement(String s) throws SQLException {
+        return delegate.clientPrepareStatement(s);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i) throws SQLException {
+        return delegate.clientPrepareStatement(s, i);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i, int i1) throws SQLException {
+        return delegate.clientPrepareStatement(s, i, i1);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int[] ints) throws SQLException {
+        return delegate.clientPrepareStatement(s, ints);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, int i, int i1, int i2) throws SQLException {
+        return delegate.clientPrepareStatement(s, i, i1, i2);
+    }
+
+    public PreparedStatement clientPrepareStatement(String s, String[] strings) throws SQLException {
+        return delegate.clientPrepareStatement(s, strings);
+    }
+
+    public int getActiveStatementCount() {
+        return delegate.getActiveStatementCount();
+    }
+
+    public long getIdleFor() {
+        return delegate.getIdleFor();
+    }
+
+    public String getStatementComment() {
+        return delegate.getStatementComment();
+    }
+
+    @Deprecated public boolean hasTriedMaster() {
+        return delegate.hasTriedMaster();
+    }
+
+    public boolean isInGlobalTx() {
+        return delegate.isInGlobalTx();
+    }
+
+    public void setInGlobalTx(boolean b) {
+        delegate.setInGlobalTx(b);
+    }
+
+    public boolean isMasterConnection() {
+        return delegate.isMasterConnection();
+    }
+
+    public boolean isNoBackslashEscapesSet() {
+        return delegate.isNoBackslashEscapesSet();
+    }
+
+    public boolean isSameResource(JdbcConnection connection) {
+        return delegate.isSameResource(connection);
+    }
+
+    public boolean lowerCaseTableNames() {
+        return delegate.lowerCaseTableNames();
+    }
+
+    public void ping() throws SQLException {
+        delegate.ping();
+    }
+
+    public void resetServerState() throws SQLException {
+        delegate.resetServerState();
+    }
+
+    public PreparedStatement serverPrepareStatement(String s) throws SQLException {
+        return delegate.serverPrepareStatement(s);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i) throws SQLException {
+        return delegate.serverPrepareStatement(s, i);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i, int i1) throws SQLException {
+        return delegate.serverPrepareStatement(s, i, i1);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int i, int i1, int i2) throws SQLException {
+        return delegate.serverPrepareStatement(s, i, i1, i2);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, int[] ints) throws SQLException {
+        return delegate.serverPrepareStatement(s, ints);
+    }
+
+    public PreparedStatement serverPrepareStatement(String s, String[] strings) throws SQLException {
+        return delegate.serverPrepareStatement(s, strings);
+    }
+
+    public void setFailedOver(boolean b) {
+        delegate.setFailedOver(b);
+    }
+
+    public void setStatementComment(String s) {
+        delegate.setStatementComment(s);
+    }
+
+    public void shutdownServer() throws SQLException {
+        delegate.shutdownServer();
+    }
+
+    public void reportQueryTime(long l) {
+        delegate.reportQueryTime(l);
+    }
+
+    public boolean isAbonormallyLongQuery(long l) {
+        return delegate.isAbonormallyLongQuery(l);
+    }
+
+    public int getAutoIncrementIncrement() {
+        return delegate.getAutoIncrementIncrement();
+    }
+
+    public boolean hasSameProperties(JdbcConnection connection) {
+        return delegate.hasSameProperties(connection);
+    }
+
+    public String getHost() {
+        return delegate.getHost();
+    }
+
+    public String getHostPortPair() {
+        return delegate.getHostPortPair();
+    }
+
+    public void setProxy(JdbcConnection connection) {
+        delegate.setProxy(connection);
+    }
+
+    public boolean isServerLocal() throws SQLException {
+        return delegate.isServerLocal();
+    }
+
+    public int getSessionMaxRows() {
+        return delegate.getSessionMaxRows();
+    }
+
+    public void setSessionMaxRows(int i) throws SQLException {
+        delegate.setSessionMaxRows(i);
+    }
+
+    public void setSchema(String s) throws SQLException {
+        delegate.setSchema(s);
+    }
+
+    public void abortInternal() throws SQLException {
+        delegate.abortInternal();
+    }
+
+    public void checkClosed() {
+        delegate.checkClosed();
+    }
+
+    public boolean isProxySet() {
+        return delegate.isProxySet();
+    }
+
+    public JdbcConnection duplicate() throws SQLException {
+        return delegate.duplicate();
+    }
+
+    public ResultSetInternalMethods execSQL(StatementImpl statement,
+        String s, int i, PacketPayload payload, boolean b, String s1,
+        ColumnDefinition definition) throws SQLException {
+        return delegate.execSQL(statement, s, i, payload, b, s1, definition);
+    }
+
+    public ResultSetInternalMethods execSQL(StatementImpl statement,
+        String s, int i, PacketPayload payload, boolean b, String s1,
+        ColumnDefinition definition, boolean b1) throws SQLException {
+        return delegate.execSQL(statement, s, i, payload, b, s1, definition, b1);
+    }
+
+    public StringBuilder generateConnectionCommentBlock(StringBuilder builder) {
+        return delegate.generateConnectionCommentBlock(builder);
+    }
+
+    public CachedResultSetMetaData getCachedMetaData(String s) {
+        return delegate.getCachedMetaData(s);
+    }
+
+    public Timer getCancelTimer() {
+        return delegate.getCancelTimer();
+    }
+
+    public String getCharacterSetMetadata() {
+        return delegate.getCharacterSetMetadata();
+    }
+
+    public Statement getMetadataSafeStatement() throws SQLException {
+        return delegate.getMetadataSafeStatement();
+    }
+
+    public boolean getRequiresEscapingEncoder() {
+        return delegate.getRequiresEscapingEncoder();
+    }
+
+    public ServerVersion getServerVersion() {
+        return delegate.getServerVersion();
+    }
+
+    public List<StatementInterceptor> getStatementInterceptorsInstances() {
+        return delegate.getStatementInterceptorsInstances();
+    }
+
+    public void incrementNumberOfPreparedExecutes() {
+        delegate.incrementNumberOfPreparedExecutes();
+    }
+
+    public void incrementNumberOfPrepares() {
+        delegate.incrementNumberOfPrepares();
+    }
+
+    public void incrementNumberOfResultSetsCreated() {
+        delegate.incrementNumberOfResultSetsCreated();
+    }
+
+    public void initializeResultsMetadataFromCache(String s, CachedResultSetMetaData data,
+        ResultSetInternalMethods methods) throws SQLException {
+        delegate.initializeResultsMetadataFromCache(s, data, methods);
+    }
+
+    public void initializeSafeStatementInterceptors() throws SQLException {
+        delegate.initializeSafeStatementInterceptors();
+    }
+
+    public boolean isReadInfoMsgEnabled() {
+        return delegate.isReadInfoMsgEnabled();
+    }
+
+    public boolean isReadOnly(boolean b) throws SQLException {
+        return delegate.isReadOnly(b);
+    }
+
+    public void pingInternal(boolean b, int i) throws SQLException {
+        delegate.pingInternal(b, i);
+    }
+
+    public void realClose(boolean b, boolean b1, boolean b2, Throwable throwable) throws SQLException {
+        delegate.realClose(b, b1, b2, throwable);
+    }
+
+    public void recachePreparedStatement(ServerPreparedStatement statement) throws SQLException {
+        delegate.recachePreparedStatement(statement);
+    }
+
+    public void decachePreparedStatement(ServerPreparedStatement statement) throws SQLException {
+        delegate.decachePreparedStatement(statement);
+    }
+
+    public void registerQueryExecutionTime(long l) {
+        delegate.registerQueryExecutionTime(l);
+    }
+
+    public void registerStatement(com.mysql.cj.api.jdbc.Statement statement) {
+        delegate.registerStatement(statement);
+    }
+
+    public void reportNumberOfTablesAccessed(int i) {
+        delegate.reportNumberOfTablesAccessed(i);
+    }
+
+    public void setReadInfoMsgEnabled(boolean b) {
+        delegate.setReadInfoMsgEnabled(b);
+    }
+
+    public void setReadOnlyInternal(boolean b) throws SQLException {
+        delegate.setReadOnlyInternal(b);
+    }
+
+    public boolean storesLowerCaseTableName() {
+        return delegate.storesLowerCaseTableName();
+    }
+
+    public void throwConnectionClosedException() throws SQLException {
+        delegate.throwConnectionClosedException();
+    }
+
+    public void transactionBegun() throws SQLException {
+        delegate.transactionBegun();
+    }
+
+    public void transactionCompleted() throws SQLException {
+        delegate.transactionCompleted();
+    }
+
+    public void unregisterStatement(com.mysql.cj.api.jdbc.Statement statement) {
+        delegate.unregisterStatement(statement);
+    }
+
+    public void unSafeStatementInterceptors() throws SQLException {
+        delegate.unSafeStatementInterceptors();
+    }
+
+    public boolean useAnsiQuotedIdentifiers() {
+        return delegate.useAnsiQuotedIdentifiers();
+    }
+
+    public JdbcConnection getMultiHostSafeProxy() {
+        return delegate.getMultiHostSafeProxy();
+    }
+
+    public ClientInfoProvider getClientInfoProviderImpl() throws SQLException {
+        return delegate.getClientInfoProviderImpl();
+    }
+
+    public Statement createStatement() throws SQLException {
+        return new StatementWrapper(delegate.createStatement(), connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql), connectionInfo, sql);
+    }
+
+    public CallableStatement prepareCall(String sql) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql), connectionInfo, sql);
+    }
+
+    public String nativeSQL(String sql) throws SQLException {
+        return delegate.nativeSQL(sql);
+    }
+
+    public void setAutoCommit(boolean autoCommit) throws SQLException {
+        delegate.setAutoCommit(autoCommit);
+    }
+
+    public boolean getAutoCommit() throws SQLException {
+        return delegate.getAutoCommit();
+    }
+
+    public void commit() throws SQLException {
+        delegate.commit();
+    }
+
+    public void rollback() throws SQLException {
+        delegate.rollback();
+    }
+
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    public boolean isClosed() throws SQLException {
+        return delegate.isClosed();
+    }
+
+    public DatabaseMetaData getMetaData() throws SQLException {
+        return delegate.getMetaData();
+    }
+
+    public void setReadOnly(boolean readOnly) throws SQLException {
+        delegate.setReadOnly(readOnly);
+    }
+
+    public boolean isReadOnly() throws SQLException {
+        return delegate.isReadOnly();
+    }
+
+    public void setCatalog(String catalog) throws SQLException {
+        delegate.setCatalog(catalog);
+    }
+
+    public String getCatalog() throws SQLException {
+        return delegate.getCatalog();
+    }
+
+    public void setTransactionIsolation(int level) throws SQLException {
+        delegate.setTransactionIsolation(level);
+    }
+
+    public int getTransactionIsolation() throws SQLException {
+        return delegate.getTransactionIsolation();
+    }
+
+    public SQLWarning getWarnings() throws SQLException {
+        return delegate.getWarnings();
+    }
+
+    public void clearWarnings() throws SQLException {
+        delegate.clearWarnings();
+    }
+
+    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+        return new StatementWrapper(delegate.createStatement(resultSetType, resultSetConcurrency), connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int resultSetType,
+        int resultSetConcurrency) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, resultSetType, resultSetConcurrency), connectionInfo, sql);
+    }
+
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql, resultSetType, resultSetConcurrency), connectionInfo, sql);
+    }
+
+    public Map<String, Class<?>> getTypeMap() throws SQLException {
+        return delegate.getTypeMap();
+    }
+
+    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        delegate.setTypeMap(map);
+    }
+
+    public void setHoldability(int holdability) throws SQLException {
+        delegate.setHoldability(holdability);
+    }
+
+    public int getHoldability() throws SQLException {
+        return delegate.getHoldability();
+    }
+
+    public Savepoint setSavepoint() throws SQLException {
+        return delegate.setSavepoint();
+    }
+
+    public Savepoint setSavepoint(String name) throws SQLException {
+        return delegate.setSavepoint(name);
+    }
+
+    public void rollback(Savepoint savepoint) throws SQLException {
+        delegate.rollback(savepoint);
+    }
+
+    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        delegate.releaseSavepoint(savepoint);
+    }
+
+    public Statement createStatement(int resultSetType, int resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new StatementWrapper(delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability), connectionInfo);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability), connectionInfo, sql);
+    }
+
+    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
+        int resultSetHoldability) throws SQLException {
+        return new CallableStatementWrapper(delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability), connectionInfo, sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, autoGeneratedKeys), connectionInfo, sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, columnIndexes), connectionInfo, sql);
+    }
+
+    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+        return new PreparedStatementWrapper(delegate.prepareStatement(sql, columnNames), connectionInfo, sql);
+    }
+
+    public Clob createClob() throws SQLException {
+        return delegate.createClob();
+    }
+
+    public Blob createBlob() throws SQLException {
+        return delegate.createBlob();
+    }
+
+    public NClob createNClob() throws SQLException {
+        return delegate.createNClob();
+    }
+
+    public SQLXML createSQLXML() throws SQLException {
+        return delegate.createSQLXML();
+    }
+
+    public boolean isValid(int timeout) throws SQLException {
+        return delegate.isValid(timeout);
+    }
+
+    public void setClientInfo(String name, String value) throws SQLClientInfoException {
+        delegate.setClientInfo(name, value);
+    }
+
+    public void setClientInfo(Properties properties) throws SQLClientInfoException {
+        delegate.setClientInfo(properties);
+    }
+
+    public String getClientInfo(String name) throws SQLException {
+        return delegate.getClientInfo(name);
+    }
+
+    public Properties getClientInfo() throws SQLException {
+        return delegate.getClientInfo();
+    }
+
+    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+        return delegate.createArrayOf(typeName, elements);
+    }
+
+    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+        return delegate.createStruct(typeName, attributes);
+    }
+
+    public String getSchema() throws SQLException {
+        return delegate.getSchema();
+    }
+
+    public void abort(Executor executor) throws SQLException {
+        delegate.abort(executor);
+    }
+
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+        delegate.setNetworkTimeout(executor, milliseconds);
+    }
+
+    public int getNetworkTimeout() throws SQLException {
+        return delegate.getNetworkTimeout();
+    }
+
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return delegate.unwrap(iface);
+    }
+
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return delegate.isWrapperFor(iface);
+    }
+
+    public void createNewIO(boolean b) {
+        delegate.createNewIO(b);
+    }
+
+    public long getId() {
+        return delegate.getId();
+    }
+
+    public Properties getProperties() {
+        return delegate.getProperties();
+    }
+
+    public String getProcessHost() {
+        return delegate.getProcessHost();
+    }
+
+    public Object getConnectionMutex() {
+        return delegate.getConnectionMutex();
+    }
+
+    public String getURL() {
+        return delegate.getURL();
+    }
+
+    public String getUser() {
+        return delegate.getUser();
+    }
+
+    public ExceptionInterceptor getExceptionInterceptor() {
+        return delegate.getExceptionInterceptor();
+    }
+
+    private final JdbcConnection delegate;
+    private final ConnectionInfo connectionInfo;
+    private Object dynamicField;
+
+    @Override
+    public Object getSkyWalkingDynamicField() {
+        return dynamicField;
+    }
+
+    @Override
+    public void setSkyWalkingDynamicField(Object value) {
+        this.dynamicField = value;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java
new file mode 100644
index 0000000..5263e45
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/LoadBalancedConnectionWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.jdbc.ha.LoadBalancedConnection;
+import java.sql.SQLException;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class LoadBalancedConnectionWrapper extends JdbcConnectionWrapper implements LoadBalancedConnection {
+
+    @Override
+    public boolean addHost(String s) throws SQLException {
+        return delegate.addHost(s);
+    }
+
+    @Override public void removeHost(String s) throws SQLException {
+        delegate.removeHost(s);
+    }
+
+    @Override public void removeHostWhenNotInUse(String s) throws SQLException {
+        delegate.removeHostWhenNotInUse(s);
+    }
+
+    @Override public void ping(boolean b) throws SQLException {
+        delegate.ping(b);
+    }
+
+    private LoadBalancedConnection delegate;
+
+    public LoadBalancedConnectionWrapper(LoadBalancedConnection delegate, ConnectionInfo info) {
+        super(delegate, info);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java
new file mode 100644
index 0000000..9854c86
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/PreparedStatementWrapper.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLType;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class PreparedStatementWrapper extends StatementWrapper implements PreparedStatement {
+
+    public PreparedStatementWrapper(PreparedStatement statement, ConnectionInfo connectionInfo, String sql,
+        String statementType) {
+        super(statement, connectionInfo, statementType);
+        this.statement = statement;
+        this.sql = sql;
+    }
+
+    public PreparedStatementWrapper(PreparedStatement statement, ConnectionInfo connectionInfo, String sql) {
+        this(statement, connectionInfo, sql, "PreparedStatement");
+    }
+
+    @Override public ResultSet executeQuery() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeQuery", sql, stateType, new TracingUtils.Executable<ResultSet>() {
+            @Override public ResultSet exe(String sql) throws SQLException {
+                return statement.executeQuery();
+            }
+        });
+    }
+
+    @Override public int executeUpdate() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate();
+            }
+        });
+    }
+
+    @Override public void setNull(int parameterIndex, int sqlType) throws SQLException {
+        statement.setNull(parameterIndex, sqlType);
+    }
+
+    @Override public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+        statement.setBoolean(parameterIndex, x);
+    }
+
+    @Override public void setByte(int parameterIndex, byte x) throws SQLException {
+        statement.setByte(parameterIndex, x);
+    }
+
+    @Override public void setShort(int parameterIndex, short x) throws SQLException {
+        statement.setShort(parameterIndex, x);
+    }
+
+    @Override public void setInt(int parameterIndex, int x) throws SQLException {
+        statement.setInt(parameterIndex, x);
+    }
+
+    @Override public void setLong(int parameterIndex, long x) throws SQLException {
+        statement.setLong(parameterIndex, x);
+    }
+
+    @Override public void setFloat(int parameterIndex, float x) throws SQLException {
+        statement.setFloat(parameterIndex, x);
+    }
+
+    @Override public void setDouble(int parameterIndex, double x) throws SQLException {
+        statement.setDouble(parameterIndex, x);
+    }
+
+    @Override public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+        statement.setBigDecimal(parameterIndex, x);
+    }
+
+    @Override public void setString(int parameterIndex, String x) throws SQLException {
+        statement.setString(parameterIndex, x);
+    }
+
+    @Override public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+        statement.setBytes(parameterIndex, x);
+    }
+
+    @Override public void setDate(int parameterIndex, Date x) throws SQLException {
+        statement.setDate(parameterIndex, x);
+    }
+
+    @Override public void setTime(int parameterIndex, Time x) throws SQLException {
+        statement.setTime(parameterIndex, x);
+    }
+
+    @Override public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+        statement.setTimestamp(parameterIndex, x);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        statement.setAsciiStream(parameterIndex, x, length);
+    }
+
+    @Override @Deprecated
+    public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        statement.setUnicodeStream(parameterIndex, x, length);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        statement.setBinaryStream(parameterIndex, x, length);
+    }
+
+    @Override public void clearParameters() throws SQLException {
+        statement.clearParameters();
+    }
+
+    @Override public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType);
+    }
+
+    @Override public void setObject(int parameterIndex, Object x) throws SQLException {
+        statement.setObject(parameterIndex, x);
+    }
+
+    @Override public boolean execute() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql);
+            }
+        });
+    }
+
+    @Override public void addBatch() throws SQLException {
+        statement.addBatch();
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader, length);
+    }
+
+    @Override public void setRef(int parameterIndex, Ref x) throws SQLException {
+        statement.setRef(parameterIndex, x);
+    }
+
+    @Override public void setBlob(int parameterIndex, Blob x) throws SQLException {
+        statement.setBlob(parameterIndex, x);
+    }
+
+    @Override public void setClob(int parameterIndex, Clob x) throws SQLException {
+        statement.setClob(parameterIndex, x);
+    }
+
+    @Override public void setArray(int parameterIndex, Array x) throws SQLException {
+        statement.setArray(parameterIndex, x);
+    }
+
+    @Override public ResultSetMetaData getMetaData() throws SQLException {
+        return statement.getMetaData();
+    }
+
+    @Override public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+        statement.setDate(parameterIndex, x, cal);
+    }
+
+    @Override public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+        statement.setTime(parameterIndex, x, cal);
+    }
+
+    @Override public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+        statement.setTimestamp(parameterIndex, x, cal);
+    }
+
+    @Override public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+        statement.setNull(parameterIndex, sqlType, typeName);
+    }
+
+    @Override public void setURL(int parameterIndex, URL x) throws SQLException {
+        statement.setURL(parameterIndex, x);
+    }
+
+    @Override public ParameterMetaData getParameterMetaData() throws SQLException {
+        return statement.getParameterMetaData();
+    }
+
+    @Override public void setRowId(int parameterIndex, RowId x) throws SQLException {
+        statement.setRowId(parameterIndex, x);
+    }
+
+    @Override public void setNString(int parameterIndex, String value) throws SQLException {
+        statement.setNString(parameterIndex, value);
+    }
+
+    @Override public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+        statement.setNCharacterStream(parameterIndex, value, length);
+    }
+
+    @Override public void setNClob(int parameterIndex, NClob value) throws SQLException {
+        statement.setNClob(parameterIndex, value);
+    }
+
+    @Override public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        statement.setClob(parameterIndex, reader, length);
+    }
+
+    @Override public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+        statement.setBlob(parameterIndex, inputStream, length);
+    }
+
+    @Override public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        statement.setNClob(parameterIndex, reader, length);
+    }
+
+    @Override public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+        statement.setSQLXML(parameterIndex, xmlObject);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        statement.setAsciiStream(parameterIndex, x, length);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        statement.setBinaryStream(parameterIndex, x, length);
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader, length);
+    }
+
+    @Override public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+        statement.setAsciiStream(parameterIndex, x);
+    }
+
+    @Override public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+        statement.setBinaryStream(parameterIndex, x);
+    }
+
+    @Override public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+        statement.setCharacterStream(parameterIndex, reader);
+    }
+
+    @Override public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+        statement.setNCharacterStream(parameterIndex, value);
+    }
+
+    @Override public void setClob(int parameterIndex, Reader reader) throws SQLException {
+        statement.setClob(parameterIndex, reader);
+    }
+
+    @Override public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+        statement.setBlob(parameterIndex, inputStream);
+    }
+
+    @Override public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+        statement.setNClob(parameterIndex, reader);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
+        statement.setObject(parameterIndex, x, targetSqlType);
+    }
+
+    private final PreparedStatement statement;
+    private final String sql;
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java
new file mode 100644
index 0000000..2b8fa00
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/ReplicationConnectionWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import com.mysql.cj.api.jdbc.JdbcConnection;
+import com.mysql.cj.api.jdbc.ha.ReplicationConnection;
+import java.sql.SQLException;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class ReplicationConnectionWrapper extends JdbcConnectionWrapper implements ReplicationConnection {
+
+    public ReplicationConnectionWrapper(JdbcConnection delegate, ConnectionInfo connectionInfo) {
+        super(delegate, connectionInfo);
+    }
+
+    @Override public long getConnectionGroupId() {
+        return replicationConnection.getConnectionGroupId();
+    }
+
+    @Override public JdbcConnection getCurrentConnection() {
+        return replicationConnection.getCurrentConnection();
+    }
+
+    @Override public JdbcConnection getMasterConnection() {
+        return replicationConnection.getMasterConnection();
+    }
+
+    @Override public void promoteSlaveToMaster(String s) throws SQLException {
+        replicationConnection.promoteSlaveToMaster(s);
+    }
+
+    @Override public void removeMasterHost(String s) throws SQLException {
+        replicationConnection.removeMasterHost(s);
+    }
+
+    @Override public void removeMasterHost(String s, boolean b) throws SQLException {
+        replicationConnection.removeMasterHost(s, b);
+    }
+
+    @Override public boolean isHostMaster(String s) {
+        return replicationConnection.isHostMaster(s);
+    }
+
+    @Override public JdbcConnection getSlavesConnection() {
+        return replicationConnection.getSlavesConnection();
+    }
+
+    @Override public void addSlaveHost(String s) throws SQLException {
+        replicationConnection.addSlaveHost(s);
+    }
+
+    @Override public void removeSlave(String s) throws SQLException {
+        replicationConnection.removeSlave(s);
+    }
+
+    @Override public void removeSlave(String s, boolean b) throws SQLException {
+        replicationConnection.removeSlave(s, b);
+    }
+
+    @Override public boolean isHostSlave(String s) {
+        return replicationConnection.isHostSlave(s);
+    }
+
+    private ReplicationConnection replicationConnection;
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java
new file mode 100644
index 0000000..1226b5f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/StatementWrapper.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class StatementWrapper implements Statement {
+
+    @Override
+    public ResultSet executeQuery(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeQuery", sql, stateType, new TracingUtils.Executable<ResultSet>() {
+            @Override public ResultSet exe(String sql) throws SQLException {
+                return statement.executeQuery(sql);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql);
+            }
+        });
+    }
+
+    @Override public void close() throws SQLException {
+        statement.close();
+    }
+
+    @Override public int getMaxFieldSize() throws SQLException {
+        return statement.getMaxFieldSize();
+    }
+
+    @Override public void setMaxFieldSize(int max) throws SQLException {
+        statement.setMaxFieldSize(max);
+    }
+
+    @Override public int getMaxRows() throws SQLException {
+        return statement.getMaxRows();
+    }
+
+    @Override public void setMaxRows(int max) throws SQLException {
+        statement.setMaxRows(max);
+    }
+
+    @Override public void setEscapeProcessing(boolean enable) throws SQLException {
+        statement.setEscapeProcessing(enable);
+    }
+
+    @Override public int getQueryTimeout() throws SQLException {
+        return statement.getQueryTimeout();
+    }
+
+    @Override public void setQueryTimeout(int seconds) throws SQLException {
+        statement.setQueryTimeout(seconds);
+    }
+
+    @Override public void cancel() throws SQLException {
+        statement.cancel();
+    }
+
+    @Override public SQLWarning getWarnings() throws SQLException {
+        return statement.getWarnings();
+    }
+
+    @Override public void clearWarnings() throws SQLException {
+        statement.clearWarnings();
+    }
+
+    @Override public void setCursorName(String name) throws SQLException {
+        statement.setCursorName(name);
+    }
+
+    @Override public boolean execute(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql);
+            }
+        });
+    }
+
+    @Override public ResultSet getResultSet() throws SQLException {
+        return statement.getResultSet();
+    }
+
+    @Override public int getUpdateCount() throws SQLException {
+        return statement.getUpdateCount();
+    }
+
+    @Override public boolean getMoreResults() throws SQLException {
+        return statement.getMoreResults();
+    }
+
+    @Override public void setFetchDirection(int direction) throws SQLException {
+        statement.setFetchDirection(direction);
+    }
+
+    @Override public int getFetchDirection() throws SQLException {
+        return statement.getFetchDirection();
+    }
+
+    @Override public void setFetchSize(int rows) throws SQLException {
+        statement.setFetchSize(rows);
+    }
+
+    @Override public int getFetchSize() throws SQLException {
+        return statement.getFetchSize();
+    }
+
+    @Override public int getResultSetConcurrency() throws SQLException {
+        return statement.getResultSetConcurrency();
+    }
+
+    @Override public int getResultSetType() throws SQLException {
+        return statement.getResultSetType();
+    }
+
+    @Override public void addBatch(String sql) throws SQLException {
+        statement.addBatch(sql);
+    }
+
+    @Override public void clearBatch() throws SQLException {
+        statement.clearBatch();
+    }
+
+    @Override public int[] executeBatch() throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeBatch", null, stateType, new TracingUtils.Executable<int[]>() {
+            @Override public int[] exe(String sql) throws SQLException {
+                return statement.executeBatch();
+            }
+        });
+    }
+
+    @Override public Connection getConnection() throws SQLException {
+        return statement.getConnection();
+    }
+
+    @Override public boolean getMoreResults(int current) throws SQLException {
+        return statement.getMoreResults(current);
+    }
+
+    @Override public ResultSet getGeneratedKeys() throws SQLException {
+        return statement.getGeneratedKeys();
+    }
+
+    @Override public int executeUpdate(String sql, final int autoGeneratedKeys) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql, final int[] columnIndexes) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public int executeUpdate(String sql, final String[] columnNames) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeUpdate", sql, stateType, new TracingUtils.Executable<Integer>() {
+            @Override public Integer exe(String sql) throws SQLException {
+                return statement.executeUpdate(sql, columnNames);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final int autoGeneratedKeys) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final int[] columnIndexes) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public boolean execute(String sql, final String[] columnNames) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "execute", sql, stateType, new TracingUtils.Executable<Boolean>() {
+            @Override public Boolean exe(String sql) throws SQLException {
+                return statement.execute(sql, columnNames);
+            }
+        });
+    }
+
+    @Override public int getResultSetHoldability() throws SQLException {
+        return statement.getResultSetHoldability();
+    }
+
+    @Override public boolean isClosed() throws SQLException {
+        return statement.isClosed();
+    }
+
+    @Override public void setPoolable(boolean poolable) throws SQLException {
+        statement.setPoolable(poolable);
+    }
+
+    @Override public boolean isPoolable() throws SQLException {
+        return statement.isPoolable();
+    }
+
+    @Override public void closeOnCompletion() throws SQLException {
+        statement.closeOnCompletion();
+    }
+
+    @Override public boolean isCloseOnCompletion() throws SQLException {
+        return statement.isCloseOnCompletion();
+    }
+
+    @Override public long getLargeUpdateCount() throws SQLException {
+        return statement.getLargeUpdateCount();
+    }
+
+    @Override public void setLargeMaxRows(long max) throws SQLException {
+        statement.setLargeMaxRows(max);
+    }
+
+    @Override public long getLargeMaxRows() throws SQLException {
+        return statement.getLargeMaxRows();
+    }
+
+    @Override public long[] executeLargeBatch() throws SQLException {
+        return statement.executeLargeBatch();
+    }
+
+    @Override public long executeLargeUpdate(String sql) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final int autoGeneratedKeys) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, autoGeneratedKeys);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final int[] columnIndexes) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, columnIndexes);
+            }
+        });
+    }
+
+    @Override public long executeLargeUpdate(String sql, final String[] columnNames) throws SQLException {
+        return TracingUtils.trace(connectionInfo, "executeLargeUpdate", sql, stateType, new TracingUtils.Executable<Long>() {
+            @Override public Long exe(String sql) throws SQLException {
+                return statement.executeLargeUpdate(sql, columnNames);
+            }
+        });
+    }
+
+    private final Statement statement;
+    protected final ConnectionInfo connectionInfo;
+    protected final String stateType;
+
+    public StatementWrapper(Statement statement, ConnectionInfo connectionInfo, String stateType) {
+        this.statement = statement;
+        this.connectionInfo = connectionInfo;
+        this.stateType = stateType;
+    }
+
+    public StatementWrapper(Statement statement, ConnectionInfo connectionInfo) {
+        this(statement, connectionInfo, "Statement");
+    }
+
+    @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+        return statement.unwrap(iface);
+    }
+
+    @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return statement.isWrapperFor(iface);
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java
new file mode 100644
index 0000000..9c6add1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/jdbc/mysql/wrapper/TracingUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.jdbc.mysql.wrapper;
+
+import java.sql.SQLException;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.plugin.jdbc.trace.ConnectionInfo;
+
+public class TracingUtils {
+
+    public static <R> R trace(ConnectionInfo connectInfo, String method, String sql, String statementType,
+        TracingUtils.Executable<R> exec)
+        throws SQLException {
+        try {
+            AbstractSpan span = ContextManager.createExitSpan(connectInfo.getDBType() + "/JDBI/" + statementType + "/" + method, connectInfo.getDatabasePeer());
+            Tags.DB_TYPE.set(span, "sql");
+            Tags.DB_INSTANCE.set(span, connectInfo.getDatabaseName());
+            Tags.DB_STATEMENT.set(span, sql);
+            span.setComponent(connectInfo.getComponent());
+            SpanLayer.asDB(span);
+            return exec.exe(sql);
+        } catch (SQLException e) {
+            AbstractSpan span = ContextManager.activeSpan();
+            span.errorOccurred();
+            span.log(e);
+            throw e;
+        } finally {
+            ContextManager.stopSpan();
+        }
+    }
+
+    public interface Executable<R> {
+        R exe(String sql) throws SQLException;
+    }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
index 65c38ea..ee872d2 100644
--- a/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/mysql-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -20,3 +20,6 @@ mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.Mysql50ConnectionIn
 mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.CallableInstrumentation
 mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.PreparedStatementInstrumentation
 mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.StatementInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.FailoverConnectionProxyInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.LoadBalancedConnectionProxyInstrumentation
+mysql-5.x=org.apache.skywalking.apm.plugin.jdbc.mysql.define.ReplicationConnectionProxyInstrumentation