You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/02 09:14:39 UTC

[shardingsphere] branch master updated: Merge JdbcUri and JdbcUrlParser (#14482)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e54abcb  Merge JdbcUri and JdbcUrlParser (#14482)
e54abcb is described below

commit e54abcb3ee3f184ea0e01be93268351f345af85c
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Jan 2 17:13:58 2022 +0800

    Merge JdbcUri and JdbcUrlParser (#14482)
---
 .../infra/config/datasource/JdbcUri.java           | 127 -------------------
 .../infra/config/datasource/JdbcUrlParser.java     | 138 +++++++++++++++++++++
 .../pool/creator/DataSourcePoolCreator.java        |   1 -
 .../{reflection => }/DataSourceReflection.java     |  11 +-
 .../creator/reflection/ConnectionURLParser.java    |  60 ---------
 .../infra/config/datasource/JdbcUriTest.java       |  54 --------
 .../infra/config/datasource/JdbcUrlParserTest.java | 102 +++++++++++++++
 .../reflection/ConnectionURLParserTest.java        |  71 -----------
 .../scenario/rulealtered/RuleAlteredJobWorker.java |   6 +-
 ...gSpherePipelineDataSourceConfigurationTest.java |   6 +-
 ...tandardPipelineDataSourceConfigurationTest.java |   4 +-
 .../pipeline/mysql/importer/MySQLImporter.java     |   2 +-
 .../mysql/ingest/MySQLIncrementalDumper.java       |  12 +-
 .../mysql/ingest/MySQLInventoryDumper.java         |   2 +-
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  16 +--
 .../config/PipelineDataSourceConfiguration.java    |   6 +-
 ...rdingSpherePipelineDataSourceConfiguration.java |   6 +-
 .../StandardPipelineDataSourceConfiguration.java   |   6 +-
 18 files changed, 279 insertions(+), 351 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java
deleted file mode 100644
index 444f931..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource;
-
-import com.google.common.base.Strings;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Jdbc uri.
- */
-public final class JdbcUri {
-    
-    private final URI jdbcUri;
-    
-    public JdbcUri(final String jdbcUrl) {
-        jdbcUri = URI.create(jdbcUrl.substring(5));
-    }
-    
-    /**
-     * Get scheme.
-     *
-     * @return scheme name
-     */
-    public String getScheme() {
-        return jdbcUri.getScheme();
-    }
-    
-    /**
-     * Get hostname.
-     *
-     * @return hostname
-     */
-    public String getHostname() {
-        return jdbcUri.getHost();
-    }
-    
-    /**
-     * Get port.
-     *
-     * @return port
-     */
-    public int getPort() {
-        return -1 == jdbcUri.getPort() ? 3306 : jdbcUri.getPort();
-    }
-    
-    /**
-     * Get host.
-     *
-     * @return host
-     */
-    public String getHost() {
-        return String.format("%s:%d", getHostname(), getPort());
-    }
-    
-    /**
-     * Get database name.
-     * @return database name
-     */
-    public String getDatabase() {
-        return null == jdbcUri.getPath() ? "" : jdbcUri.getPath().replaceFirst("/", "");
-    }
-    
-    /**
-     * Get parameters.
-     *
-     * @return parameters.
-     */
-    public Map<String, String> getParameters() {
-        Map<String, String> result = new HashMap<>();
-        if (Strings.isNullOrEmpty(jdbcUri.getQuery())) {
-            return result;
-        }
-        String[] parameters = jdbcUri.getQuery().split("&");
-        for (String each : parameters) {
-            String[] args = each.split("=");
-            result.put(args[0], 1 == args.length ? null : args[1]);
-        }
-        return result;
-    }
-    
-    /**
-     * Append parameters.
-     *
-     * @param parameters JDBC parameters
-     * @return new JDBC URL
-     */
-    public String appendParameters(final Map<String, String> parameters) {
-        return String.format("jdbc:%s://%s/%s?%s", getScheme(), getHost(), getDatabase(), mergeParameters(getParameters(), parameters));
-    }
-    
-    private String mergeParameters(final Map<String, String> parameters, final Map<String, String> appendParameters) {
-        parameters.putAll(appendParameters);
-        return formatParameters(parameters);
-    }
-    
-    private String formatParameters(final Map<String, String> parameters) {
-        StringBuilder result = new StringBuilder();
-        for (Entry<String, String> entry : parameters.entrySet()) {
-            result.append(entry.getKey());
-            if (null != entry.getValue()) {
-                result.append("=").append(entry.getValue());
-            }
-            result.append("&");
-        }
-        result.deleteCharAt(result.length() - 1);
-        return result.toString();
-    }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java
new file mode 100644
index 0000000..66c2bdb
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.datasource;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.Getter;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * JDBC URL parser.
+ */
+public final class JdbcUrlParser {
+    
+    private static final String SCHEMA_PATTERN = "(?<schema>[\\w\\+:%]+)\\s*";
+    
+    private static final String AUTHORITY_PATTERN = "(?://(?<authority>[^/?#]*))?\\s*";
+    
+    private static final String PATH_PATTERN = "(?:/(?!\\s*/)(?<path>[^?#]*))?";
+    
+    private static final String QUERY_PATTERN = "(?:\\?(?!\\s*\\?)(?<query>[^#]*))?";
+    
+    private static final Pattern CONNECTION_URL_PATTERN = Pattern.compile(SCHEMA_PATTERN + AUTHORITY_PATTERN + PATH_PATTERN + QUERY_PATTERN);
+    
+    private static final String AUTHORITY_GROUP_KEY = "authority";
+    
+    private static final String PATH_GROUP_KEY = "path";
+    
+    private static final String QUERY_GROUP_KEY = "query";
+    
+    private final String jdbcURL;
+    
+    private final String authority;
+    
+    @Getter
+    private final String database;
+    
+    private final String query;
+    
+    public JdbcUrlParser(final String jdbcURL) {
+        this.jdbcURL = jdbcURL;
+        Matcher matcher = CONNECTION_URL_PATTERN.matcher(jdbcURL);
+        if (matcher.matches()) {
+            authority = matcher.group(AUTHORITY_GROUP_KEY);
+            database = matcher.group(PATH_GROUP_KEY);
+            query = matcher.group(QUERY_GROUP_KEY);
+        } else {
+            authority = "";
+            database = "";
+            query = "";
+        }
+    }
+    
+    /**
+     * Get hostname.
+     *
+     * @return hostname
+     */
+    public String getHostname() {
+        if (!authority.contains(":")) {
+            return authority;
+        }
+        String[] values = authority.split(":");
+        if (2 == values.length) {
+            return values[0];
+        }
+        // TODO process with multiple services, for example: replication, failover etc
+        return null;
+    }
+    
+    /**
+     * Get port.
+     * 
+     * @return port
+     */
+    public int getPort() {
+        if (!authority.contains(":")) {
+            // TODO adapt other databases
+            return 3306;
+        }
+        String[] values = authority.split(":");
+        if (2 == values.length) {
+            return Integer.parseInt(values[1]);
+        }
+        // TODO process with multiple services, for example: replication, failover etc
+        return -1;
+    }
+    
+    /**
+     * Get query properties from JDBC connection URL.
+     *
+     * @return query properties
+     */
+    public Map<String, String> getQueryProperties() {
+        return Strings.isNullOrEmpty(query) ? Collections.emptyMap() : Splitter.on("&").withKeyValueSeparator("=").split(query);
+    }
+    
+    /**
+     * Append query properties.
+     *
+     * @param queryProps query properties
+     * @return new JDBC URL
+     */
+    public String appendQueryProperties(final Map<String, String> queryProps) {
+        StringBuilder result = new StringBuilder(jdbcURL);
+        String delimiter = Strings.isNullOrEmpty(query) ? "?" : "&";
+        result.append(delimiter);
+        for (Entry<String, String> entry : queryProps.entrySet()) {
+            result.append(entry.getKey());
+            if (null != entry.getValue()) {
+                result.append("=").append(entry.getValue());
+            }
+            result.append("&");
+        }
+        result.deleteCharAt(result.length() - 1);
+        return result.toString();
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
index 2f45d5c..f68fead 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.infra.config.datasource.pool.creator;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection.DataSourceReflection;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
similarity index 95%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
index eaad0d0..49a4082 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
+package org.apache.shardingsphere.infra.config.datasource.pool.creator;
 
 import com.google.common.base.CaseFormat;
 import com.google.common.collect.Sets;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 
 import javax.sql.DataSource;
 import java.lang.reflect.Method;
@@ -149,18 +150,18 @@ public final class DataSourceReflection {
             return;
         }
         Properties targetDataSourceProps = getDataSourcePropertiesFieldName(dataSourcePropsFieldName);
-        Map<String, String> jdbcUrlProps = new ConnectionURLParser(getJdbcUrl(jdbcUrlFieldName)).getProperties();
+        Map<String, String> queryProps = new JdbcUrlParser(getJdbcUrl(jdbcUrlFieldName)).getQueryProperties();
         for (Entry<Object, Object> entry : defaultDataSourceProps.entrySet()) {
             String defaultPropertyKey = entry.getKey().toString();
             String defaultPropertyValue = entry.getValue().toString();
-            if (!containsDefaultProperty(defaultPropertyKey, targetDataSourceProps, jdbcUrlProps)) {
+            if (!containsDefaultProperty(defaultPropertyKey, targetDataSourceProps, queryProps)) {
                 targetDataSourceProps.setProperty(defaultPropertyKey, defaultPropertyValue);
             }
         }
     }
     
-    private boolean containsDefaultProperty(final String defaultPropertyKey, final Properties targetDataSourceProps, final Map<String, String> jdbcUrlProps) {
-        return targetDataSourceProps.containsKey(defaultPropertyKey) || jdbcUrlProps.containsKey(defaultPropertyKey);
+    private boolean containsDefaultProperty(final String defaultPropertyKey, final Properties targetDataSourceProps, final Map<String, String> queryProps) {
+        return targetDataSourceProps.containsKey(defaultPropertyKey) || queryProps.containsKey(defaultPropertyKey);
     }
     
     @SneakyThrows(ReflectiveOperationException.class)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java
deleted file mode 100644
index f6be8ea..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * JDBC connection URL parser.
- */
-public final class ConnectionURLParser {
-    
-    private static final String PROPS_GROUP_KEY = "props";
-    
-    private static final String SCHEMA_PATTERN = "(?<schema>[\\w\\+:%]+)\\s*";
-    
-    private static final String AUTHORITY_PATTERN = "(?://(?<authority>[^/?#]*))?\\s*";
-    
-    private static final String PATH_PATTERN = "(?:/(?!\\s*/)(?<path>[^?#]*))?";
-    
-    private static final String PROPS_PATTERN = "(?:\\?(?!\\s*\\?)(?<props>[^#]*))?";
-    
-    private static final Pattern CONNECTION_URL_PATTERN = Pattern.compile(SCHEMA_PATTERN + AUTHORITY_PATTERN + PATH_PATTERN + PROPS_PATTERN);
-    
-    private final String props;
-    
-    public ConnectionURLParser(final String jdbcURL) {
-        Matcher matcher = CONNECTION_URL_PATTERN.matcher(jdbcURL);
-        props = !matcher.matches() ? "" : matcher.group(PROPS_GROUP_KEY);
-    }
-    
-    /**
-     * Get properties from JDBC connection URL.
-     *
-     * @return properties
-     */
-    public Map<String, String> getProperties() {
-        return Strings.isNullOrEmpty(props) ? Collections.emptyMap() : Splitter.on("&").withKeyValueSeparator("=").split(props);
-    }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java
deleted file mode 100644
index 4df9e10..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class JdbcUriTest {
-    
-    @Test
-    public void assertInitJdbcUri() {
-        JdbcUri jdbcUri = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db");
-        assertThat(jdbcUri.getHostname(), is("127.0.0.1"));
-        assertThat(jdbcUri.getPort(), is(3306));
-        assertThat(jdbcUri.getHost(), is("127.0.0.1:3306"));
-        assertThat(jdbcUri.getDatabase(), is("test_db"));
-        assertThat(jdbcUri.getScheme(), is("mysql"));
-    }
-    
-    @Test
-    public void assertGetParameters() {
-        Map<String, String> parameters = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db?useSSL=true&maxReconnects=30").getParameters();
-        assertThat(parameters.size(), is(2));
-        assertThat(parameters.get("useSSL"), is("true"));
-        assertThat(parameters.get("maxReconnects"), is("30"));
-    }
-    
-    @Test
-    public void assertAppendJDBCParameters() {
-        JdbcUri jdbcUri = new JdbcUri("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false");
-        String jdbcUrl = jdbcUri.appendParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
-        assertThat(jdbcUrl, is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
-    }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java
new file mode 100644
index 0000000..0d6a4a3
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.datasource;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class JdbcUrlParserTest {
+    
+    @Test
+    public void assertParseSimpleJdbcUrl() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("mock:jdbc://127.0.0.1/");
+        assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+        assertThat(jdbcUrlParser.getPort(), is(3306));
+        assertThat(jdbcUrlParser.getDatabase(), is(""));
+        assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+    }
+    
+    @Test
+    public void assertParseMySQLJdbcUrl() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
+        assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+        assertThat(jdbcUrlParser.getPort(), is(3306));
+        assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+        assertThat(jdbcUrlParser.getQueryProperties().size(), is(2));
+        assertThat(jdbcUrlParser.getQueryProperties().get("serverTimezone"), is("UTC"));
+        assertThat(jdbcUrlParser.getQueryProperties().get("useSSL"), is("false"));
+    }
+    
+    @Test
+    public void assertParseMySQLJdbcUrlWithReplication() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql:replication://master-ip:3306,slave-1-ip:3306,slave-2-ip:3306/demo_ds?useUnicode=true");
+        assertNull(jdbcUrlParser.getHostname());
+        assertThat(jdbcUrlParser.getPort(), is(-1));
+        assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+        assertThat(jdbcUrlParser.getQueryProperties().size(), is(1));
+        assertThat(jdbcUrlParser.getQueryProperties().get("useUnicode"), is("true"));
+    }
+    
+    @Test
+    public void assertParsePostgreSQLJdbcUrl() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:postgresql://127.0.0.1:5432/demo_ds?prepareThreshold=1&preferQueryMode=extendedForPrepared");
+        assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+        assertThat(jdbcUrlParser.getPort(), is(5432));
+        assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+        assertThat(jdbcUrlParser.getQueryProperties().size(), is(2));
+        assertThat(jdbcUrlParser.getQueryProperties().get("prepareThreshold"), is("1"));
+        assertThat(jdbcUrlParser.getQueryProperties().get("preferQueryMode"), is("extendedForPrepared"));
+    }
+    
+    @Test
+    public void assertParseMicrosoftSQLServerJdbcUrl() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:microsoft:sqlserver://127.0.0.1:3306/demo_ds");
+        assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+        assertThat(jdbcUrlParser.getPort(), is(3306));
+        assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+        assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+    }
+    
+    @Test
+    public void assertParseIncorrectURL() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+        assertThat(jdbcUrlParser.getHostname(), is(""));
+        assertThat(jdbcUrlParser.getPort(), is(3306));
+        assertThat(jdbcUrlParser.getDatabase(), is(""));
+        assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+    }
+    
+    @Test
+    public void assertAppendQueryPropertiesWithoutOriginalQueryProperties() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://192.168.0.1:3306/demo_ds");
+        String actual = jdbcUrlParser.appendQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+        assertThat(actual, is("jdbc:mysql://192.168.0.1:3306/demo_ds?rewriteBatchedStatements=true"));
+    }
+    
+    @Test
+    public void assertAppendQueryPropertiesWithOriginalQueryProperties() {
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://192.168.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
+        String actual = jdbcUrlParser.appendQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+        assertThat(actual, is("jdbc:mysql://192.168.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java
deleted file mode 100644
index cf89a96..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
-
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class ConnectionURLParserTest {
-    
-    @Test
-    public void assertParseMySQLWithoutProps() {
-        ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql://127.0.0.1:3306/demo_ds");
-        assertTrue(connectionUrlParser.getProperties().isEmpty());
-    }
-    
-    @Test
-    public void assertParseMySQLWithProps() {
-        ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
-        assertThat(connectionUrlParser.getProperties().size(), is(2));
-        assertThat(connectionUrlParser.getProperties().get("serverTimezone"), is("UTC"));
-        assertThat(connectionUrlParser.getProperties().get("useSSL"), is("false"));
-    }
-    
-    @Test
-    public void assertParseMySQLWithReplication() {
-        ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql:replication://master_ip:3306,slave_1_ip:3306,slave_2_ip:3306/demo_ds?useUnicode=true");
-        assertThat(connectionUrlParser.getProperties().size(), is(1));
-        assertThat(connectionUrlParser.getProperties().get("useUnicode"), is("true"));
-    }
-    
-    @Test
-    public void assertParsePostgreSQLWithProps() {
-        ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:postgresql://127.0.0.1:5432/demo_ds?prepareThreshold=1&preferQueryMode=extendedForPrepared");
-        assertThat(connectionUrlParser.getProperties().size(), is(2));
-        assertThat(connectionUrlParser.getProperties().get("prepareThreshold"), is("1"));
-        assertThat(connectionUrlParser.getProperties().get("preferQueryMode"), is("extendedForPrepared"));
-    }
-    
-    @Test
-    public void assertParseMicrosoftSQLServerWithoutProps() {
-        assertTrue(new ConnectionURLParser("jdbc:microsoft:sqlserver://127.0.0.1:3306/demo_ds").getProperties().isEmpty());
-    }
-    
-    @Test
-    public void assertParseMockSQLWithoutProps() {
-        assertTrue(new ConnectionURLParser("mock:jdbc://127.0.0.1:3306/demo_ds").getProperties().isEmpty());
-    }
-    
-    @Test
-    public void assertParseIncorrectURL() {
-        assertTrue(new ConnectionURLParser("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL").getProperties().isEmpty());
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 70b60f6..5ebd38b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
 import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
@@ -258,10 +258,10 @@ public final class RuleAlteredJobWorker {
         if (!(databaseType instanceof MySQLDatabaseType)) {
             return;
         }
-        Map<String, String> parameters = ImmutableMap.of("useSSL", "false");
+        Map<String, String> queryProps = ImmutableMap.of("useSSL", "false");
         for (Entry<String, Map<String, Object>> entry : yamlDataSources.entrySet()) {
             jdbcUrl = (String) entry.getValue().get("jdbcUrl");
-            entry.getValue().put("jdbcUrl", new JdbcUri(jdbcUrl).appendParameters(parameters));
+            entry.getValue().put("jdbcUrl", new JdbcUrlParser(jdbcUrl).appendQueryProperties(queryProps));
         }
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
index b6016af..e2dd00a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
@@ -36,10 +36,10 @@ public final class ShardingSpherePipelineDataSourceConfigurationTest {
     @Test
     public void assertAppendJDBCParameters() {
         ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
-        dataSourceConfig.appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+        dataSourceConfig.appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
         List<DataSourceConfiguration> actual = new ArrayList<>(getDataSourceConfigurations(dataSourceConfig.getRootConfig().getDataSources()).values());
-        assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
-        assertThat(actual.get(1).getProps().get("url"), is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
+        assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+        assertThat(actual.get(1).getProps().get("url"), is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
     }
     
     private String getDataSourceYaml() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
index 1d22291..214b125 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
@@ -56,7 +56,7 @@ public final class StandardPipelineDataSourceConfigurationTest {
     public void assertAppendJDBCParameters() {
         StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(
                 "jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false", null, null);
-        pipelineDataSourceConfig.appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
-        assertThat(pipelineDataSourceConfig.getHikariConfig().getJdbcUrl(), is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
+        pipelineDataSourceConfig.appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+        assertThat(pipelineDataSourceConfig.getHikariConfig().getJdbcUrl(), is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index b70c017..30c6270 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -34,7 +34,7 @@ public final class MySQLImporter extends AbstractImporter {
     
     public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
         super(importerConfig, dataSourceManager);
-        importerConfig.getDataSourceConfig().appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+        importerConfig.getDataSourceConfig().appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index e4ab712..03b03de 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,7 +47,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQ
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQLColumnMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
 import java.io.Serializable;
@@ -99,15 +99,15 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
     private void dump() {
         HikariConfig hikariConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getHikariConfig();
         log.info("incremental dump, jdbcUrl={}", hikariConfig.getJdbcUrl());
-        JdbcUri uri = new JdbcUri(hikariConfig.getJdbcUrl());
-        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), hikariConfig.getUsername(), hikariConfig.getPassword()));
+        JdbcUrlParser jdbcUrlParser = new JdbcUrlParser(hikariConfig.getJdbcUrl());
+        MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), jdbcUrlParser.getHostname(), jdbcUrlParser.getPort(), hikariConfig.getUsername(), hikariConfig.getPassword()));
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         int eventCount = 0;
         while (isRunning()) {
             AbstractBinlogEvent event = client.poll();
             if (null != event) {
-                handleEvent(uri, event);
+                handleEvent(jdbcUrlParser, event);
                 eventCount++;
             }
         }
@@ -115,8 +115,8 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         pushRecord(new FinishedRecord(new PlaceholderPosition()));
     }
     
-    private void handleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
-        if (event instanceof PlaceholderEvent || filter(uri.getDatabase(), (AbstractRowsEvent) event)) {
+    private void handleEvent(final JdbcUrlParser jdbcUrlParser, final AbstractBinlogEvent event) {
+        if (event instanceof PlaceholderEvent || filter(jdbcUrlParser.getDatabase(), (AbstractRowsEvent) event)) {
             createPlaceholderRecord(event);
             return;
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index bca9771..f19039a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -35,7 +35,7 @@ public final class MySQLInventoryDumper extends AbstractInventoryDumper {
     
     public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager) {
         super(inventoryDumperConfig, dataSourceManager);
-        inventoryDumperConfig.getDataSourceConfig().appendJDBCParameters(ImmutableMap.<String, String>builder().put("yearIsDateType", "false").build());
+        inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("yearIsDateType", "false").build());
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 970c54e..94bbf4e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -34,7 +34,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -97,7 +97,7 @@ public final class MySQLIncrementalDumperTest {
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setAfterRows(rows);
-        invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+        invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -115,7 +115,7 @@ public final class MySQLIncrementalDumperTest {
         afterRows.add(new String[]{"1", "order_new"});
         rowsEvent.setBeforeRows(beforeRows);
         rowsEvent.setAfterRows(afterRows);
-        invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+        invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -130,7 +130,7 @@ public final class MySQLIncrementalDumperTest {
         List<Serializable[]> rows = new ArrayList<>(1);
         rows.add(new String[]{"1", "order"});
         rowsEvent.setBeforeRows(rows);
-        invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+        invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof DataRecord);
@@ -139,7 +139,7 @@ public final class MySQLIncrementalDumperTest {
     
     @Test
     public void assertPlaceholderEvent() {
-        invokeHandleEvent(new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db"), new PlaceholderEvent());
+        invokeHandleEvent(new JdbcUrlParser("jdbc:mysql://127.0.0.1:3306/test_db"), new PlaceholderEvent());
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof PlaceholderRecord);
@@ -149,14 +149,14 @@ public final class MySQLIncrementalDumperTest {
     public void assertRowsEventFiltered() {
         WriteRowsEvent rowsEvent = new WriteRowsEvent();
         rowsEvent.setSchemaName("unknown_schema");
-        invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+        invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
         List<Record> records = channel.fetchRecords(1, 0);
         assertThat(records.size(), is(1));
         assertTrue(records.get(0) instanceof PlaceholderRecord);
     }
     
     @SneakyThrows({NoSuchMethodException.class, ReflectiveOperationException.class})
-    private void invokeHandleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
-        ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new Class[]{JdbcUri.class, AbstractBinlogEvent.class}, new Object[]{uri, event});
+    private void invokeHandleEvent(final JdbcUrlParser jdbcUrlParser, final AbstractBinlogEvent event) {
+        ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new Class[]{JdbcUrlParser.class, AbstractBinlogEvent.class}, new Object[]{jdbcUrlParser, event});
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
index 50c740c..a592462 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
@@ -48,11 +48,11 @@ public interface PipelineDataSourceConfiguration {
     Object getDataSourceConfiguration();
     
     /**
-     * Append JDBC parameters.
+     * Append JDBC queryProps.
      *
-     * @param parameters JDBC parameters
+     * @param queryProps JDBC query properties
      */
-    void appendJDBCParameters(Map<String, String> parameters);
+    void appendJDBCQueryProperties(Map<String, String> queryProps);
     
     /**
      * Get database type.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index ca60e7e..f3c02bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -24,7 +24,7 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
@@ -84,11 +84,11 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
     }
     
     @Override
-    public void appendJDBCParameters(final Map<String, String> parameters) {
+    public void appendJDBCQueryProperties(final Map<String, String> queryProps) {
         rootConfig.getDataSources()
                 .forEach((key, value) -> {
                     String jdbcUrlKey = value.containsKey("url") ? "url" : "jdbcUrl";
-                    value.replace(jdbcUrlKey, new JdbcUri(value.get(jdbcUrlKey).toString()).appendParameters(parameters));
+                    value.replace(jdbcUrlKey, new JdbcUrlParser(value.get(jdbcUrlKey).toString()).appendQueryProperties(queryProps));
                 });
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 6438fcb..1225281 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -23,7 +23,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
@@ -96,8 +96,8 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
     }
     
     @Override
-    public void appendJDBCParameters(final Map<String, String> parameters) {
-        hikariConfig.setJdbcUrl(new JdbcUri(hikariConfig.getJdbcUrl()).appendParameters(parameters));
+    public void appendJDBCQueryProperties(final Map<String, String> queryProps) {
+        hikariConfig.setJdbcUrl(new JdbcUrlParser(hikariConfig.getJdbcUrl()).appendQueryProperties(queryProps));
     }
     
     // TODO toShardingSphereJDBCDataSource(final String actualDataSourceName, final String logicTableName, final String actualTableName)