You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/02 09:14:39 UTC
[shardingsphere] branch master updated: Merge JdbcUri and JdbcUrlParser (#14482)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e54abcb Merge JdbcUri and JdbcUrlParser (#14482)
e54abcb is described below
commit e54abcb3ee3f184ea0e01be93268351f345af85c
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Jan 2 17:13:58 2022 +0800
Merge JdbcUri and JdbcUrlParser (#14482)
---
.../infra/config/datasource/JdbcUri.java | 127 -------------------
.../infra/config/datasource/JdbcUrlParser.java | 138 +++++++++++++++++++++
.../pool/creator/DataSourcePoolCreator.java | 1 -
.../{reflection => }/DataSourceReflection.java | 11 +-
.../creator/reflection/ConnectionURLParser.java | 60 ---------
.../infra/config/datasource/JdbcUriTest.java | 54 --------
.../infra/config/datasource/JdbcUrlParserTest.java | 102 +++++++++++++++
.../reflection/ConnectionURLParserTest.java | 71 -----------
.../scenario/rulealtered/RuleAlteredJobWorker.java | 6 +-
...gSpherePipelineDataSourceConfigurationTest.java | 6 +-
...tandardPipelineDataSourceConfigurationTest.java | 4 +-
.../pipeline/mysql/importer/MySQLImporter.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 12 +-
.../mysql/ingest/MySQLInventoryDumper.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumperTest.java | 16 +--
.../config/PipelineDataSourceConfiguration.java | 6 +-
...rdingSpherePipelineDataSourceConfiguration.java | 6 +-
.../StandardPipelineDataSourceConfiguration.java | 6 +-
18 files changed, 279 insertions(+), 351 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java
deleted file mode 100644
index 444f931..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUri.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource;
-
-import com.google.common.base.Strings;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Jdbc uri.
- */
-public final class JdbcUri {
-
- private final URI jdbcUri;
-
- public JdbcUri(final String jdbcUrl) {
- jdbcUri = URI.create(jdbcUrl.substring(5));
- }
-
- /**
- * Get scheme.
- *
- * @return scheme name
- */
- public String getScheme() {
- return jdbcUri.getScheme();
- }
-
- /**
- * Get hostname.
- *
- * @return hostname
- */
- public String getHostname() {
- return jdbcUri.getHost();
- }
-
- /**
- * Get port.
- *
- * @return port
- */
- public int getPort() {
- return -1 == jdbcUri.getPort() ? 3306 : jdbcUri.getPort();
- }
-
- /**
- * Get host.
- *
- * @return host
- */
- public String getHost() {
- return String.format("%s:%d", getHostname(), getPort());
- }
-
- /**
- * Get database name.
- * @return database name
- */
- public String getDatabase() {
- return null == jdbcUri.getPath() ? "" : jdbcUri.getPath().replaceFirst("/", "");
- }
-
- /**
- * Get parameters.
- *
- * @return parameters.
- */
- public Map<String, String> getParameters() {
- Map<String, String> result = new HashMap<>();
- if (Strings.isNullOrEmpty(jdbcUri.getQuery())) {
- return result;
- }
- String[] parameters = jdbcUri.getQuery().split("&");
- for (String each : parameters) {
- String[] args = each.split("=");
- result.put(args[0], 1 == args.length ? null : args[1]);
- }
- return result;
- }
-
- /**
- * Append parameters.
- *
- * @param parameters JDBC parameters
- * @return new JDBC URL
- */
- public String appendParameters(final Map<String, String> parameters) {
- return String.format("jdbc:%s://%s/%s?%s", getScheme(), getHost(), getDatabase(), mergeParameters(getParameters(), parameters));
- }
-
- private String mergeParameters(final Map<String, String> parameters, final Map<String, String> appendParameters) {
- parameters.putAll(appendParameters);
- return formatParameters(parameters);
- }
-
- private String formatParameters(final Map<String, String> parameters) {
- StringBuilder result = new StringBuilder();
- for (Entry<String, String> entry : parameters.entrySet()) {
- result.append(entry.getKey());
- if (null != entry.getValue()) {
- result.append("=").append(entry.getValue());
- }
- result.append("&");
- }
- result.deleteCharAt(result.length() - 1);
- return result.toString();
- }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java
new file mode 100644
index 0000000..66c2bdb
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParser.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.datasource;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.Getter;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * JDBC URL parser.
+ */
+public final class JdbcUrlParser {
+
+ private static final String SCHEMA_PATTERN = "(?<schema>[\\w\\+:%]+)\\s*";
+
+ private static final String AUTHORITY_PATTERN = "(?://(?<authority>[^/?#]*))?\\s*";
+
+ private static final String PATH_PATTERN = "(?:/(?!\\s*/)(?<path>[^?#]*))?";
+
+ private static final String QUERY_PATTERN = "(?:\\?(?!\\s*\\?)(?<query>[^#]*))?";
+
+ private static final Pattern CONNECTION_URL_PATTERN = Pattern.compile(SCHEMA_PATTERN + AUTHORITY_PATTERN + PATH_PATTERN + QUERY_PATTERN);
+
+ private static final String AUTHORITY_GROUP_KEY = "authority";
+
+ private static final String PATH_GROUP_KEY = "path";
+
+ private static final String QUERY_GROUP_KEY = "query";
+
+ private final String jdbcURL;
+
+ private final String authority;
+
+ @Getter
+ private final String database;
+
+ private final String query;
+
+ public JdbcUrlParser(final String jdbcURL) {
+ this.jdbcURL = jdbcURL;
+ Matcher matcher = CONNECTION_URL_PATTERN.matcher(jdbcURL);
+ if (matcher.matches()) {
+ authority = matcher.group(AUTHORITY_GROUP_KEY);
+ database = matcher.group(PATH_GROUP_KEY);
+ query = matcher.group(QUERY_GROUP_KEY);
+ } else {
+ authority = "";
+ database = "";
+ query = "";
+ }
+ }
+
+ /**
+ * Get hostname.
+ *
+ * @return hostname
+ */
+ public String getHostname() {
+ if (!authority.contains(":")) {
+ return authority;
+ }
+ String[] values = authority.split(":");
+ if (2 == values.length) {
+ return values[0];
+ }
+ // TODO process with multiple services, for example: replication, failover etc
+ return null;
+ }
+
+ /**
+ * Get port.
+ *
+ * @return port
+ */
+ public int getPort() {
+ if (!authority.contains(":")) {
+ // TODO adapt other databases
+ return 3306;
+ }
+ String[] values = authority.split(":");
+ if (2 == values.length) {
+ return Integer.parseInt(values[1]);
+ }
+ // TODO process with multiple services, for example: replication, failover etc
+ return -1;
+ }
+
+ /**
+ * Get query properties from JDBC connection URL.
+ *
+ * @return query properties
+ */
+ public Map<String, String> getQueryProperties() {
+ return Strings.isNullOrEmpty(query) ? Collections.emptyMap() : Splitter.on("&").withKeyValueSeparator("=").split(query);
+ }
+
+ /**
+ * Append query properties.
+ *
+ * @param queryProps query properties
+ * @return new JDBC URL
+ */
+ public String appendQueryProperties(final Map<String, String> queryProps) {
+ StringBuilder result = new StringBuilder(jdbcURL);
+ String delimiter = Strings.isNullOrEmpty(query) ? "?" : "&";
+ result.append(delimiter);
+ for (Entry<String, String> entry : queryProps.entrySet()) {
+ result.append(entry.getKey());
+ if (null != entry.getValue()) {
+ result.append("=").append(entry.getValue());
+ }
+ result.append("&");
+ }
+ result.deleteCharAt(result.length() - 1);
+ return result.toString();
+ }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
index 2f45d5c..f68fead 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourcePoolCreator.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.infra.config.datasource.pool.creator;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection.DataSourceReflection;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.required.RequiredSPIRegistry;
import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
similarity index 95%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
index eaad0d0..49a4082 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/DataSourceReflection.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/DataSourceReflection.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
+package org.apache.shardingsphere.infra.config.datasource.pool.creator;
import com.google.common.base.CaseFormat;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import javax.sql.DataSource;
import java.lang.reflect.Method;
@@ -149,18 +150,18 @@ public final class DataSourceReflection {
return;
}
Properties targetDataSourceProps = getDataSourcePropertiesFieldName(dataSourcePropsFieldName);
- Map<String, String> jdbcUrlProps = new ConnectionURLParser(getJdbcUrl(jdbcUrlFieldName)).getProperties();
+ Map<String, String> queryProps = new JdbcUrlParser(getJdbcUrl(jdbcUrlFieldName)).getQueryProperties();
for (Entry<Object, Object> entry : defaultDataSourceProps.entrySet()) {
String defaultPropertyKey = entry.getKey().toString();
String defaultPropertyValue = entry.getValue().toString();
- if (!containsDefaultProperty(defaultPropertyKey, targetDataSourceProps, jdbcUrlProps)) {
+ if (!containsDefaultProperty(defaultPropertyKey, targetDataSourceProps, queryProps)) {
targetDataSourceProps.setProperty(defaultPropertyKey, defaultPropertyValue);
}
}
}
- private boolean containsDefaultProperty(final String defaultPropertyKey, final Properties targetDataSourceProps, final Map<String, String> jdbcUrlProps) {
- return targetDataSourceProps.containsKey(defaultPropertyKey) || jdbcUrlProps.containsKey(defaultPropertyKey);
+ private boolean containsDefaultProperty(final String defaultPropertyKey, final Properties targetDataSourceProps, final Map<String, String> queryProps) {
+ return targetDataSourceProps.containsKey(defaultPropertyKey) || queryProps.containsKey(defaultPropertyKey);
}
@SneakyThrows(ReflectiveOperationException.class)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java
deleted file mode 100644
index f6be8ea..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParser.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
-
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * JDBC connection URL parser.
- */
-public final class ConnectionURLParser {
-
- private static final String PROPS_GROUP_KEY = "props";
-
- private static final String SCHEMA_PATTERN = "(?<schema>[\\w\\+:%]+)\\s*";
-
- private static final String AUTHORITY_PATTERN = "(?://(?<authority>[^/?#]*))?\\s*";
-
- private static final String PATH_PATTERN = "(?:/(?!\\s*/)(?<path>[^?#]*))?";
-
- private static final String PROPS_PATTERN = "(?:\\?(?!\\s*\\?)(?<props>[^#]*))?";
-
- private static final Pattern CONNECTION_URL_PATTERN = Pattern.compile(SCHEMA_PATTERN + AUTHORITY_PATTERN + PATH_PATTERN + PROPS_PATTERN);
-
- private final String props;
-
- public ConnectionURLParser(final String jdbcURL) {
- Matcher matcher = CONNECTION_URL_PATTERN.matcher(jdbcURL);
- props = !matcher.matches() ? "" : matcher.group(PROPS_GROUP_KEY);
- }
-
- /**
- * Get properties from JDBC connection URL.
- *
- * @return properties
- */
- public Map<String, String> getProperties() {
- return Strings.isNullOrEmpty(props) ? Collections.emptyMap() : Splitter.on("&").withKeyValueSeparator("=").split(props);
- }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java
deleted file mode 100644
index 4df9e10..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUriTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-
-import java.util.Map;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class JdbcUriTest {
-
- @Test
- public void assertInitJdbcUri() {
- JdbcUri jdbcUri = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db");
- assertThat(jdbcUri.getHostname(), is("127.0.0.1"));
- assertThat(jdbcUri.getPort(), is(3306));
- assertThat(jdbcUri.getHost(), is("127.0.0.1:3306"));
- assertThat(jdbcUri.getDatabase(), is("test_db"));
- assertThat(jdbcUri.getScheme(), is("mysql"));
- }
-
- @Test
- public void assertGetParameters() {
- Map<String, String> parameters = new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db?useSSL=true&maxReconnects=30").getParameters();
- assertThat(parameters.size(), is(2));
- assertThat(parameters.get("useSSL"), is("true"));
- assertThat(parameters.get("maxReconnects"), is("30"));
- }
-
- @Test
- public void assertAppendJDBCParameters() {
- JdbcUri jdbcUri = new JdbcUri("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false");
- String jdbcUrl = jdbcUri.appendParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
- assertThat(jdbcUrl, is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
- }
-}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java
new file mode 100644
index 0000000..0d6a4a3
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/JdbcUrlParserTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.datasource;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class JdbcUrlParserTest {
+
+ @Test
+ public void assertParseSimpleJdbcUrl() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("mock:jdbc://127.0.0.1/");
+ assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+ assertThat(jdbcUrlParser.getPort(), is(3306));
+ assertThat(jdbcUrlParser.getDatabase(), is(""));
+ assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+ }
+
+ @Test
+ public void assertParseMySQLJdbcUrl() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
+ assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+ assertThat(jdbcUrlParser.getPort(), is(3306));
+ assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+ assertThat(jdbcUrlParser.getQueryProperties().size(), is(2));
+ assertThat(jdbcUrlParser.getQueryProperties().get("serverTimezone"), is("UTC"));
+ assertThat(jdbcUrlParser.getQueryProperties().get("useSSL"), is("false"));
+ }
+
+ @Test
+ public void assertParseMySQLJdbcUrlWithReplication() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql:replication://master-ip:3306,slave-1-ip:3306,slave-2-ip:3306/demo_ds?useUnicode=true");
+ assertNull(jdbcUrlParser.getHostname());
+ assertThat(jdbcUrlParser.getPort(), is(-1));
+ assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+ assertThat(jdbcUrlParser.getQueryProperties().size(), is(1));
+ assertThat(jdbcUrlParser.getQueryProperties().get("useUnicode"), is("true"));
+ }
+
+ @Test
+ public void assertParsePostgreSQLJdbcUrl() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:postgresql://127.0.0.1:5432/demo_ds?prepareThreshold=1&preferQueryMode=extendedForPrepared");
+ assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+ assertThat(jdbcUrlParser.getPort(), is(5432));
+ assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+ assertThat(jdbcUrlParser.getQueryProperties().size(), is(2));
+ assertThat(jdbcUrlParser.getQueryProperties().get("prepareThreshold"), is("1"));
+ assertThat(jdbcUrlParser.getQueryProperties().get("preferQueryMode"), is("extendedForPrepared"));
+ }
+
+ @Test
+ public void assertParseMicrosoftSQLServerJdbcUrl() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:microsoft:sqlserver://127.0.0.1:3306/demo_ds");
+ assertThat(jdbcUrlParser.getHostname(), is("127.0.0.1"));
+ assertThat(jdbcUrlParser.getPort(), is(3306));
+ assertThat(jdbcUrlParser.getDatabase(), is("demo_ds"));
+ assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+ }
+
+ @Test
+ public void assertParseIncorrectURL() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+ assertThat(jdbcUrlParser.getHostname(), is(""));
+ assertThat(jdbcUrlParser.getPort(), is(3306));
+ assertThat(jdbcUrlParser.getDatabase(), is(""));
+ assertTrue(jdbcUrlParser.getQueryProperties().isEmpty());
+ }
+
+ @Test
+ public void assertAppendQueryPropertiesWithoutOriginalQueryProperties() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://192.168.0.1:3306/demo_ds");
+ String actual = jdbcUrlParser.appendQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+ assertThat(actual, is("jdbc:mysql://192.168.0.1:3306/demo_ds?rewriteBatchedStatements=true"));
+ }
+
+ @Test
+ public void assertAppendQueryPropertiesWithOriginalQueryProperties() {
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser("jdbc:mysql://192.168.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
+ String actual = jdbcUrlParser.appendQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+ assertThat(actual, is("jdbc:mysql://192.168.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+ }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java
deleted file mode 100644
index cf89a96..0000000
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/config/datasource/pool/creator/reflection/ConnectionURLParserTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.config.datasource.pool.creator.reflection;
-
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public final class ConnectionURLParserTest {
-
- @Test
- public void assertParseMySQLWithoutProps() {
- ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql://127.0.0.1:3306/demo_ds");
- assertTrue(connectionUrlParser.getProperties().isEmpty());
- }
-
- @Test
- public void assertParseMySQLWithProps() {
- ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false");
- assertThat(connectionUrlParser.getProperties().size(), is(2));
- assertThat(connectionUrlParser.getProperties().get("serverTimezone"), is("UTC"));
- assertThat(connectionUrlParser.getProperties().get("useSSL"), is("false"));
- }
-
- @Test
- public void assertParseMySQLWithReplication() {
- ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:mysql:replication://master_ip:3306,slave_1_ip:3306,slave_2_ip:3306/demo_ds?useUnicode=true");
- assertThat(connectionUrlParser.getProperties().size(), is(1));
- assertThat(connectionUrlParser.getProperties().get("useUnicode"), is("true"));
- }
-
- @Test
- public void assertParsePostgreSQLWithProps() {
- ConnectionURLParser connectionUrlParser = new ConnectionURLParser("jdbc:postgresql://127.0.0.1:5432/demo_ds?prepareThreshold=1&preferQueryMode=extendedForPrepared");
- assertThat(connectionUrlParser.getProperties().size(), is(2));
- assertThat(connectionUrlParser.getProperties().get("prepareThreshold"), is("1"));
- assertThat(connectionUrlParser.getProperties().get("preferQueryMode"), is("extendedForPrepared"));
- }
-
- @Test
- public void assertParseMicrosoftSQLServerWithoutProps() {
- assertTrue(new ConnectionURLParser("jdbc:microsoft:sqlserver://127.0.0.1:3306/demo_ds").getProperties().isEmpty());
- }
-
- @Test
- public void assertParseMockSQLWithoutProps() {
- assertTrue(new ConnectionURLParser("mock:jdbc://127.0.0.1:3306/demo_ds").getProperties().isEmpty());
- }
-
- @Test
- public void assertParseIncorrectURL() {
- assertTrue(new ConnectionURLParser("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL").getProperties().isEmpty());
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 70b60f6..5ebd38b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
@@ -258,10 +258,10 @@ public final class RuleAlteredJobWorker {
if (!(databaseType instanceof MySQLDatabaseType)) {
return;
}
- Map<String, String> parameters = ImmutableMap.of("useSSL", "false");
+ Map<String, String> queryProps = ImmutableMap.of("useSSL", "false");
for (Entry<String, Map<String, Object>> entry : yamlDataSources.entrySet()) {
jdbcUrl = (String) entry.getValue().get("jdbcUrl");
- entry.getValue().put("jdbcUrl", new JdbcUri(jdbcUrl).appendParameters(parameters));
+ entry.getValue().put("jdbcUrl", new JdbcUrlParser(jdbcUrl).appendQueryProperties(queryProps));
}
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
index b6016af..e2dd00a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/ShardingSpherePipelineDataSourceConfigurationTest.java
@@ -36,10 +36,10 @@ public final class ShardingSpherePipelineDataSourceConfigurationTest {
@Test
public void assertAppendJDBCParameters() {
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(getDataSourceYaml());
- dataSourceConfig.appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+ dataSourceConfig.appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
List<DataSourceConfiguration> actual = new ArrayList<>(getDataSourceConfigurations(dataSourceConfig.getRootConfig().getDataSources()).values());
- assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
- assertThat(actual.get(1).getProps().get("url"), is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
+ assertThat(actual.get(0).getProps().get("url"), is("jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
+ assertThat(actual.get(1).getProps().get("url"), is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
}
private String getDataSourceYaml() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
index 1d22291..214b125 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/infra/config/datasource/config/impl/StandardPipelineDataSourceConfigurationTest.java
@@ -56,7 +56,7 @@ public final class StandardPipelineDataSourceConfigurationTest {
public void assertAppendJDBCParameters() {
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(
"jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false", null, null);
- pipelineDataSourceConfig.appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
- assertThat(pipelineDataSourceConfig.getHikariConfig().getJdbcUrl(), is("jdbc:mysql://192.168.0.1:3306/scaling?rewriteBatchedStatements=true&serverTimezone=UTC&useSSL=false"));
+ pipelineDataSourceConfig.appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+ assertThat(pipelineDataSourceConfig.getHikariConfig().getJdbcUrl(), is("jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false&rewriteBatchedStatements=true"));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index b70c017..30c6270 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -34,7 +34,7 @@ public final class MySQLImporter extends AbstractImporter {
public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
super(importerConfig, dataSourceManager);
- importerConfig.getDataSourceConfig().appendJDBCParameters(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
+ importerConfig.getDataSourceConfig().appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("rewriteBatchedStatements", "true").build());
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index e4ab712..03b03de 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -47,7 +47,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQ
import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.metadata.MySQLColumnMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import java.io.Serializable;
@@ -99,15 +99,15 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
private void dump() {
HikariConfig hikariConfig = ((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).getHikariConfig();
log.info("incremental dump, jdbcUrl={}", hikariConfig.getJdbcUrl());
- JdbcUri uri = new JdbcUri(hikariConfig.getJdbcUrl());
- MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), hikariConfig.getUsername(), hikariConfig.getPassword()));
+ JdbcUrlParser jdbcUrlParser = new JdbcUrlParser(hikariConfig.getJdbcUrl());
+ MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), jdbcUrlParser.getHostname(), jdbcUrlParser.getPort(), hikariConfig.getUsername(), hikariConfig.getPassword()));
client.connect();
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
int eventCount = 0;
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null != event) {
- handleEvent(uri, event);
+ handleEvent(jdbcUrlParser, event);
eventCount++;
}
}
@@ -115,8 +115,8 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
pushRecord(new FinishedRecord(new PlaceholderPosition()));
}
- private void handleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
- if (event instanceof PlaceholderEvent || filter(uri.getDatabase(), (AbstractRowsEvent) event)) {
+ private void handleEvent(final JdbcUrlParser jdbcUrlParser, final AbstractBinlogEvent event) {
+ if (event instanceof PlaceholderEvent || filter(jdbcUrlParser.getDatabase(), (AbstractRowsEvent) event)) {
createPlaceholderRecord(event);
return;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index bca9771..f19039a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -35,7 +35,7 @@ public final class MySQLInventoryDumper extends AbstractInventoryDumper {
public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager) {
super(inventoryDumperConfig, dataSourceManager);
- inventoryDumperConfig.getDataSourceConfig().appendJDBCParameters(ImmutableMap.<String, String>builder().put("yearIsDateType", "false").build());
+ inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(ImmutableMap.<String, String>builder().put("yearIsDateType", "false").build());
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 970c54e..94bbf4e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -34,7 +34,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import org.junit.Before;
import org.junit.Test;
@@ -97,7 +97,7 @@ public final class MySQLIncrementalDumperTest {
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setAfterRows(rows);
- invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+ invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -115,7 +115,7 @@ public final class MySQLIncrementalDumperTest {
afterRows.add(new String[]{"1", "order_new"});
rowsEvent.setBeforeRows(beforeRows);
rowsEvent.setAfterRows(afterRows);
- invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+ invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -130,7 +130,7 @@ public final class MySQLIncrementalDumperTest {
List<Serializable[]> rows = new ArrayList<>(1);
rows.add(new String[]{"1", "order"});
rowsEvent.setBeforeRows(rows);
- invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+ invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof DataRecord);
@@ -139,7 +139,7 @@ public final class MySQLIncrementalDumperTest {
@Test
public void assertPlaceholderEvent() {
- invokeHandleEvent(new JdbcUri("jdbc:mysql://127.0.0.1:3306/test_db"), new PlaceholderEvent());
+ invokeHandleEvent(new JdbcUrlParser("jdbc:mysql://127.0.0.1:3306/test_db"), new PlaceholderEvent());
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
@@ -149,14 +149,14 @@ public final class MySQLIncrementalDumperTest {
public void assertRowsEventFiltered() {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setSchemaName("unknown_schema");
- invokeHandleEvent(new JdbcUri(URL), rowsEvent);
+ invokeHandleEvent(new JdbcUrlParser(URL), rowsEvent);
List<Record> records = channel.fetchRecords(1, 0);
assertThat(records.size(), is(1));
assertTrue(records.get(0) instanceof PlaceholderRecord);
}
@SneakyThrows({NoSuchMethodException.class, ReflectiveOperationException.class})
- private void invokeHandleEvent(final JdbcUri uri, final AbstractBinlogEvent event) {
- ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new Class[]{JdbcUri.class, AbstractBinlogEvent.class}, new Object[]{uri, event});
+ private void invokeHandleEvent(final JdbcUrlParser jdbcUrlParser, final AbstractBinlogEvent event) {
+ ReflectionUtil.invokeMethod(incrementalDumper, "handleEvent", new Class[]{JdbcUrlParser.class, AbstractBinlogEvent.class}, new Object[]{jdbcUrlParser, event});
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
index 50c740c..a592462 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/PipelineDataSourceConfiguration.java
@@ -48,11 +48,11 @@ public interface PipelineDataSourceConfiguration {
Object getDataSourceConfiguration();
/**
- * Append JDBC parameters.
+ * Append JDBC queryProps.
*
- * @param parameters JDBC parameters
+ * @param queryProps JDBC query properties
*/
- void appendJDBCParameters(Map<String, String> parameters);
+ void appendJDBCQueryProperties(Map<String, String> queryProps);
/**
* Get database type.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index ca60e7e..f3c02bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -24,7 +24,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlConfiguration;
@@ -84,11 +84,11 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
}
@Override
- public void appendJDBCParameters(final Map<String, String> parameters) {
+ public void appendJDBCQueryProperties(final Map<String, String> queryProps) {
rootConfig.getDataSources()
.forEach((key, value) -> {
String jdbcUrlKey = value.containsKey("url") ? "url" : "jdbcUrl";
- value.replace(jdbcUrlKey, new JdbcUri(value.get(jdbcUrlKey).toString()).appendParameters(parameters));
+ value.replace(jdbcUrlKey, new JdbcUrlParser(value.get(jdbcUrlKey).toString()).appendQueryProperties(queryProps));
});
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 6438fcb..1225281 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -23,7 +23,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.JdbcUri;
+import org.apache.shardingsphere.infra.config.datasource.JdbcUrlParser;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
@@ -96,8 +96,8 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
}
@Override
- public void appendJDBCParameters(final Map<String, String> parameters) {
- hikariConfig.setJdbcUrl(new JdbcUri(hikariConfig.getJdbcUrl()).appendParameters(parameters));
+ public void appendJDBCQueryProperties(final Map<String, String> queryProps) {
+ hikariConfig.setJdbcUrl(new JdbcUrlParser(hikariConfig.getJdbcUrl()).appendQueryProperties(queryProps));
}
// TODO toShardingSphereJDBCDataSource(final String actualDataSourceName, final String logicTableName, final String actualTableName)