You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/25 10:02:28 UTC
[shardingsphere] branch master updated: Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a21698577d7 Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)
a21698577d7 is described below
commit a21698577d77733e748801c6723443acd510ed79
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Oct 25 18:02:21 2022 +0800
Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)
* Support heterogeneous datasource for merge engine and jdbc, proxy adapter
* fix agent unit test
* fix proxy unit test
---
.../jaeger/advice/JDBCExecutorCallbackAdvice.java | 8 ++-
.../advice/JDBCExecutorCallbackAdvice.java | 8 ++-
.../AbstractJDBCExecutorCallbackAdviceTest.java | 12 +++-
.../zipkin/advice/JDBCExecutorCallbackAdvice.java | 8 ++-
.../sharding/merge/ShardingResultMergerEngine.java | 4 +-
.../merge/dql/ShardingDQLResultMerger.java | 4 +-
.../infra/database/type/DatabaseTypeEngine.java | 6 +-
.../resource/ShardingSphereResourceMetaData.java | 6 +-
.../type/AlterTableStatementSchemaRefresher.java | 2 +-
.../type/AlterViewStatementSchemaRefresher.java | 2 +-
.../type/CreateTableStatementSchemaRefresher.java | 2 +-
.../type/CreateViewStatementSchemaRefresher.java | 2 +-
.../type/RenameTableStatementSchemaRefresher.java | 2 +-
.../RenameTableStatementSchemaRefresherTest.java | 2 +-
.../engine/driver/jdbc/JDBCExecutorCallback.java | 23 +++---
.../engine/jdbc/JDBCExecutorCallbackTest.java | 60 ++++++++--------
.../shardingsphere/infra/merge/MergeEngine.java | 3 +-
.../merge/engine/merger/ResultMergerEngine.java | 4 +-
.../fixture/merger/ResultMergerEngineFixture.java | 2 +-
.../infra/rewrite/SQLRewriteEntry.java | 2 +-
.../infra/rewrite/SQLRewriteEntryTest.java | 2 +-
.../infra/util/reflect/ReflectiveUtil.java | 83 ++++++++++++++++++++++
.../driver/executor/DriverExecutor.java | 4 +-
.../batch/BatchPreparedStatementExecutor.java | 7 +-
.../executor/callback/ExecuteQueryCallback.java | 12 ++--
.../PreparedStatementExecuteQueryCallback.java | 6 +-
.../impl/StatementExecuteQueryCallback.java | 6 +-
.../statement/ShardingSpherePreparedStatement.java | 24 ++++---
.../core/statement/ShardingSphereStatement.java | 25 ++++---
.../transaction/rule/TransactionRule.java | 2 +-
.../transaction/rule/TransactionRuleTest.java | 2 +-
.../mode/manager/ContextManager.java | 4 +-
.../backend/communication/ProxySQLExecutor.java | 2 +-
.../communication/ReactiveProxySQLExecutor.java | 2 +-
.../jdbc/JDBCDatabaseCommunicationEngine.java | 7 +-
.../jdbc/executor/ProxyJDBCExecutor.java | 7 +-
.../callback/ProxyJDBCExecutorCallback.java | 19 ++---
.../callback/ProxyJDBCExecutorCallbackFactory.java | 10 +--
.../ProxyPreparedStatementExecutorCallback.java | 5 +-
.../impl/ProxyStatementExecutorCallback.java | 5 +-
.../handler/distsql/rul/sql/PreviewHandler.java | 17 +++--
.../text/query/MySQLMultiStatementsHandler.java | 12 ++--
.../query/MySQLComQueryPacketExecutorTest.java | 2 +-
.../query/MySQLMultiStatementsHandlerTest.java | 10 +--
.../bind/OpenGaussComBatchBindExecutorTest.java | 6 +-
.../PostgreSQLBatchedStatementsExecutor.java | 12 ++--
...egatedBatchedStatementsCommandExecutorTest.java | 6 +-
.../PostgreSQLBatchedStatementsExecutorTest.java | 4 +-
.../PostgreSQLComDescribeExecutorTest.java | 2 +-
.../AbstractSQLRewriterParameterizedTest.java | 6 +-
50 files changed, 303 insertions(+), 170 deletions(-)
diff --git a/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
index 65f75b8bb64..f23a1f39d2c 100644
--- a/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
import org.apache.shardingsphere.agent.plugin.tracing.jaeger.constant.JaegerConstants;
import org.apache.shardingsphere.agent.plugin.tracing.jaeger.span.JaegerErrorSpan;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
import java.lang.reflect.Method;
import java.sql.DatabaseMetaData;
@@ -54,9 +56,11 @@ public final class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdv
builder = builder.asChildOf(root);
}
JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
- Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+ Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+ Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
getMetaDataMethod.setAccessible(true);
- DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+ DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+ new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
builder.withTag(Tags.COMPONENT.getKey(), JaegerConstants.COMPONENT_NAME)
.withTag(Tags.DB_TYPE.getKey(), JaegerConstants.DB_TYPE_VALUE)
.withTag(Tags.DB_INSTANCE.getKey(), executionUnit.getExecutionUnit().getDataSourceName())
diff --git a/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
index 4935ef128e2..89fdbfaa0ab 100644
--- a/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.agent.api.advice.InstanceMethodAroundAdvice;
import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
import org.apache.shardingsphere.agent.plugin.tracing.opentelemetry.constant.OpenTelemetryConstants;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
import java.lang.reflect.Method;
import java.sql.DatabaseMetaData;
@@ -56,9 +58,11 @@ public class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdvice {
spanBuilder.setAttribute(OpenTelemetryConstants.COMPONENT, OpenTelemetryConstants.COMPONENT_NAME);
spanBuilder.setAttribute(OpenTelemetryConstants.DB_TYPE, OpenTelemetryConstants.DB_TYPE_VALUE);
JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
- Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+ Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+ Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
getMetaDataMethod.setAccessible(true);
- DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+ DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+ new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
spanBuilder.setAttribute(OpenTelemetryConstants.DB_INSTANCE, executionUnit.getExecutionUnit().getDataSourceName())
.setAttribute(OpenTelemetryConstants.PEER_HOSTNAME, metaData.getHostname())
.setAttribute(OpenTelemetryConstants.PEER_PORT, String.valueOf(metaData.getPort()))
diff --git a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
index f3520289734..a6f0aac1734 100644
--- a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
+++ b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
@@ -23,10 +23,13 @@ import org.apache.shardingsphere.agent.api.advice.AdviceTargetObject;
import org.apache.shardingsphere.agent.plugin.tracing.AgentRunner;
import org.apache.shardingsphere.agent.plugin.tracing.MockDataSourceMetaData;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
import org.junit.runner.RunWith;
import org.mockito.internal.util.reflection.FieldReader;
@@ -36,6 +39,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
@@ -78,8 +82,12 @@ public abstract class AbstractJDBCExecutorCallbackAdviceTest implements AdviceTe
return invocation.callRealMethod();
}
});
- Map<String, DataSourceMetaData> map = (Map<String, DataSourceMetaData>) new FieldReader(mock, JDBCExecutorCallback.class.getDeclaredField("CACHED_DATASOURCE_METADATA")).read();
- map.put("mock_url", new MockDataSourceMetaData());
+ Map<String, DataSourceMetaData> cachedDatasourceMetadata =
+ (Map<String, DataSourceMetaData>) new FieldReader(mock, JDBCExecutorCallback.class.getDeclaredField("CACHED_DATASOURCE_METADATA")).read();
+ cachedDatasourceMetadata.put("mock_url", new MockDataSourceMetaData());
+ Map<String, DatabaseType> storageTypes = new LinkedHashMap<>(1, 1);
+ storageTypes.put("mock.db", new MySQLDatabaseType());
+ ReflectiveUtil.setProperty(mock, "storageTypes", storageTypes);
targetObject = (AdviceTargetObject) mock;
}
}
diff --git a/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
index 6a3d78c374f..7d0ebf320a1 100644
--- a/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
@@ -25,8 +25,10 @@ import org.apache.shardingsphere.agent.api.advice.InstanceMethodAroundAdvice;
import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
import org.apache.shardingsphere.agent.plugin.tracing.zipkin.constant.ZipkinConstants;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
import java.lang.reflect.Method;
import java.sql.DatabaseMetaData;
@@ -49,9 +51,11 @@ public final class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdv
span.tag(ZipkinConstants.Tags.COMPONENT, ZipkinConstants.COMPONENT_NAME);
span.tag(ZipkinConstants.Tags.DB_TYPE, ZipkinConstants.DB_TYPE_VALUE);
JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
- Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+ Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+ Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
getMetaDataMethod.setAccessible(true);
- DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+ DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+ new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
span.tag(ZipkinConstants.Tags.DB_INSTANCE, executionUnit.getExecutionUnit().getDataSourceName());
span.tag(ZipkinConstants.Tags.PEER_HOSTNAME, metaData.getHostname());
span.tag(ZipkinConstants.Tags.PEER_PORT, String.valueOf(metaData.getPort()));
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
index 4a311feabe0..c1ed83869eb 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
@@ -38,10 +38,10 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
@Override
- public ResultMerger newInstance(final String databaseName, final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties props,
+ public ResultMerger newInstance(final String databaseName, final DatabaseType protocolType, final ShardingRule shardingRule, final ConfigurationProperties props,
final SQLStatementContext<?> sqlStatementContext) {
if (sqlStatementContext instanceof SelectStatementContext) {
- return new ShardingDQLResultMerger(databaseType);
+ return new ShardingDQLResultMerger(protocolType);
}
if (sqlStatementContext.getSqlStatement() instanceof DDLStatement) {
return new ShardingDDLResultMerger();
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
index 710e5095e3b..0d9fb296e00 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
@@ -52,7 +52,7 @@ import java.util.TreeMap;
@RequiredArgsConstructor
public final class ShardingDQLResultMerger implements ResultMerger {
- private final DatabaseType databaseType;
+ private final DatabaseType protocolType;
@Override
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext,
@@ -129,7 +129,7 @@ public final class ShardingDQLResultMerger implements ResultMerger {
if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
return mergedResult;
}
- String trunkDatabaseName = DatabaseTypeEngine.getTrunkDatabaseType(databaseType.getType()).getType();
+ String trunkDatabaseName = DatabaseTypeEngine.getTrunkDatabaseType(protocolType.getType()).getType();
if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName) || "openGauss".equals(trunkDatabaseName)) {
return new LimitDecoratorMergedResult(mergedResult, paginationContext);
}
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index cddf7464658..fb381eade72 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -156,12 +156,12 @@ public final class DatabaseTypeEngine {
/**
* Get default schema name.
*
- * @param databaseType database type
+ * @param protocolType protocol type
* @param databaseName database name
* @return default schema name
*/
- public static String getDefaultSchemaName(final DatabaseType databaseType, final String databaseName) {
- return databaseType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema() : databaseName.toLowerCase();
+ public static String getDefaultSchemaName(final DatabaseType protocolType, final String databaseName) {
+ return protocolType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) protocolType).getDefaultSchema() : databaseName.toLowerCase();
}
/**
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
index da2c19c3c25..0c1cdb2b95a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
@@ -43,7 +43,7 @@ public final class ShardingSphereResourceMetaData {
private final Map<String, DataSource> dataSources;
- private final Map<String, DatabaseType> databaseTypes;
+ private final Map<String, DatabaseType> storageTypes;
// TODO remove databaseType when all scenarios have been replaced
private final DatabaseType databaseType;
@@ -55,11 +55,11 @@ public final class ShardingSphereResourceMetaData {
this.dataSources = dataSources;
Map<String, DataSource> enabledDataSources = DataSourceStateManager.getInstance().getEnabledDataSourceMap(databaseName, dataSources);
databaseType = getDatabaseType(enabledDataSources);
- databaseTypes = createDatabaseTypes(enabledDataSources);
+ storageTypes = createStorageTypes(enabledDataSources);
dataSourceMetaDataMap = createDataSourceMetaDataMap(dataSources);
}
- private Map<String, DatabaseType> createDatabaseTypes(final Map<String, DataSource> dataSources) {
+ private Map<String, DatabaseType> createStorageTypes(final Map<String, DataSource> dataSources) {
Map<String, DatabaseType> result = new LinkedHashMap<>(dataSources.size(), 1);
for (Entry<String, DataSource> entry : dataSources.entrySet()) {
result.put(entry.getKey(), getDatabaseType(Collections.singletonMap(entry.getKey(), entry.getValue())));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
index f5c629ba30f..e2b370f5aa5 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
@@ -70,7 +70,7 @@ public final class AlterTableStatementSchemaRefresher implements MetaDataRefresh
database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
}
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+ database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
actualTableMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(tableName, optional));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
index 8096eb01e48..3d55e8dcc7b 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
@@ -81,7 +81,7 @@ public final class AlterViewStatementSchemaRefresher implements MetaDataRefreshe
database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, viewName));
}
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+ database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(viewName), material);
Optional<ShardingSphereTable> actualViewMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(viewName));
actualViewMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(viewName, optional));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
index 831b5ec2b57..c984f626771 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
@@ -49,7 +49,7 @@ public final class CreateTableStatementSchemaRefresher implements MetaDataRefres
database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
}
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+ database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
if (actualTableMetaData.isPresent()) {
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
index 21b5e586ec6..66f2744e5ca 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
@@ -50,7 +50,7 @@ public final class CreateViewStatementSchemaRefresher implements MetaDataRefresh
database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, viewName));
}
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+ database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(viewName), material);
Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(viewName));
if (actualTableMetaData.isPresent()) {
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
index aebc85c1923..bd30608e490 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
@@ -68,7 +68,7 @@ public final class RenameTableStatementSchemaRefresher implements MetaDataRefres
database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
}
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+ database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
actualTableMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(tableName, optional));
diff --git a/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java b/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
index 345268c7a63..722f424fab9 100644
--- a/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
+++ b/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
@@ -82,7 +82,7 @@ public final class RenameTableStatementSchemaRefresherTest {
private ShardingSphereResourceMetaData mockShardingSphereResourceMetaData() {
ShardingSphereResourceMetaData result = mock(ShardingSphereResourceMetaData.class);
when(result.getDataSources()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new MockedDataSource()));
- when(result.getDatabaseTypes()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new SQL92DatabaseType()));
+ when(result.getStorageTypes()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new SQL92DatabaseType()));
return result;
}
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index b030381c864..b74af5500a6 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -17,11 +17,9 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -31,6 +29,7 @@ import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.DatabaseMetaData;
@@ -54,8 +53,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
private final DatabaseType protocolType;
- @Getter
- private final DatabaseType databaseType;
+ private final Map<String, DatabaseType> storageTypes;
private final SQLStatement sqlStatement;
@@ -63,10 +61,6 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
private final EventBusContext eventBusContext;
- public JDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
- this(databaseType, databaseType, sqlStatement, isExceptionThrown, eventBusContext);
- }
-
@Override
public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
// TODO It is better to judge whether need sane result before execute, can avoid exception thrown
@@ -87,17 +81,18 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
*/
private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
- DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData());
+ DatabaseType storageType = storageTypes.get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName());
+ DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData(), storageType);
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
- T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
+ T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
sqlExecutionHook.finishSuccess();
finishReport(dataMap, jdbcExecutionUnit);
return result;
} catch (final SQLException ex) {
- if (!databaseType.equals(protocolType)) {
+ if (!storageType.equals(protocolType)) {
Optional<T> saneResult = getSaneResult(sqlStatement, ex);
if (saneResult.isPresent()) {
return isTrunkThread ? saneResult.get() : null;
@@ -109,12 +104,12 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
}
}
- private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData databaseMetaData) throws SQLException {
+ private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData databaseMetaData, final DatabaseType storageType) throws SQLException {
String url = databaseMetaData.getURL();
if (CACHED_DATASOURCE_METADATA.containsKey(url)) {
return CACHED_DATASOURCE_METADATA.get(url);
}
- DataSourceMetaData result = databaseType.getDataSourceMetaData(url, databaseMetaData.getUserName());
+ DataSourceMetaData result = storageType.getDataSourceMetaData(url, databaseMetaData.getUserName());
CACHED_DATASOURCE_METADATA.put(url, result);
return result;
}
@@ -125,7 +120,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
}
}
- protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode) throws SQLException;
+ protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
}
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index a67a3883e55..4f44967d0e7 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.jdbc;
import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -77,11 +78,12 @@ public final class JDBCExecutorCallbackTest {
@SuppressWarnings("unchecked")
@Test
public void assertExecute() throws SQLException, NoSuchFieldException, IllegalAccessException {
- JDBCExecutorCallback<?> jdbcExecutorCallback = new JDBCExecutorCallback<Integer>(DatabaseTypeFactory.getInstance("MySQL"), mock(SelectStatement.class), true,
+ DatabaseType databaseType = DatabaseTypeFactory.getInstance("MySQL");
+ JDBCExecutorCallback<?> jdbcExecutorCallback = new JDBCExecutorCallback<Integer>(databaseType, Collections.singletonMap("ds", databaseType), mock(SelectStatement.class), true,
new EventBusContext()) {
@Override
- protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return ((PreparedStatement) statement).executeUpdate();
}
@@ -102,38 +104,40 @@ public final class JDBCExecutorCallbackTest {
@Test
public void assertExecuteFailedAndProtocolTypeDifferentWithDatabaseType() throws SQLException {
Object saneResult = new Object();
- JDBCExecutorCallback<Object> callback = new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), DatabaseTypeFactory.getInstance("PostgreSQL"),
- mock(SelectStatement.class), true, new EventBusContext()) {
-
- @Override
- protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
- throw new SQLException();
- }
-
- @Override
- protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
- return Optional.of(saneResult);
- }
- };
+ JDBCExecutorCallback<Object> callback =
+ new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonMap("ds", DatabaseTypeFactory.getInstance("PostgreSQL")),
+ mock(SelectStatement.class), true, new EventBusContext()) {
+
+ @Override
+ protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+ throw new SQLException();
+ }
+
+ @Override
+ protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
+ return Optional.of(saneResult);
+ }
+ };
assertThat(callback.execute(units, true, Collections.emptyMap()), is(Collections.singletonList(saneResult)));
assertThat(callback.execute(units, false, Collections.emptyMap()), is(Collections.emptyList()));
}
@Test(expected = SQLException.class)
public void assertExecuteSQLExceptionOccurredAndProtocolTypeSameAsDatabaseType() throws SQLException {
- JDBCExecutorCallback<Object> callback = new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), DatabaseTypeFactory.getInstance("PostgreSQL"),
- mock(SelectStatement.class), true, new EventBusContext()) {
-
- @Override
- protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
- throw new SQLException();
- }
-
- @Override
- protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
+ JDBCExecutorCallback<Object> callback =
+ new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonMap("ds", DatabaseTypeFactory.getInstance("PostgreSQL")),
+ mock(SelectStatement.class), true, new EventBusContext()) {
+
+ @Override
+ protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+ throw new SQLException();
+ }
+
+ @Override
+ protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
+ return Optional.empty();
+ }
+ };
callback.execute(units, true, Collections.emptyMap());
}
}
diff --git a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
index bfe6898661d..6d188f20e24 100644
--- a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
+++ b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
@@ -77,8 +77,7 @@ public final class MergeEngine {
private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
- ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(
- database.getName(), database.getResourceMetaData().getDatabaseType(), entry.getKey(), props, sqlStatementContext);
+ ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(database.getName(), database.getProtocolType(), entry.getKey(), props, sqlStatementContext);
return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, database, connectionContext));
}
}
diff --git a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
index b397ba71918..2ed1452ddff 100644
--- a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
+++ b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
@@ -34,11 +34,11 @@ public interface ResultMergerEngine<T extends ShardingSphereRule> extends Result
* Create new instance of result merger engine.
*
* @param databaseName database name
- * @param databaseType database type
+ * @param protocolType protocol type
* @param rule rule
* @param props ShardingSphere properties
* @param sqlStatementContext SQL statement context
* @return created instance
*/
- ResultMerger newInstance(String databaseName, DatabaseType databaseType, T rule, ConfigurationProperties props, SQLStatementContext<?> sqlStatementContext);
+ ResultMerger newInstance(String databaseName, DatabaseType protocolType, T rule, ConfigurationProperties props, SQLStatementContext<?> sqlStatementContext);
}
diff --git a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
index e42d55ae683..953a4f79bdf 100644
--- a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
+++ b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
public final class ResultMergerEngineFixture implements ResultMergerEngine<MergerRuleFixture> {
@Override
- public ResultMerger newInstance(final String databaseName, final DatabaseType databaseType, final MergerRuleFixture rule, final ConfigurationProperties props,
+ public ResultMerger newInstance(final String databaseName, final DatabaseType protocolType, final MergerRuleFixture rule, final ConfigurationProperties props,
final SQLStatementContext<?> sqlStatementContext) {
return new ResultMergerFixture();
}
diff --git a/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java b/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
index dfbd06574ac..d0f9b49b78e 100644
--- a/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
+++ b/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
@@ -73,7 +73,7 @@ public final class SQLRewriteEntry {
SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, parameters, sqlStatementContext, routeContext, connectionContext);
SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);
DatabaseType protocolType = database.getProtocolType();
- Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getDatabaseTypes();
+ Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
return routeContext.getRouteUnits().isEmpty()
? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext)
: new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);
diff --git a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
index 4d3a5a1a8ae..e23fb6028e8 100644
--- a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
+++ b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
@@ -86,7 +86,7 @@ public final class SQLRewriteEntryTest {
Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(2, 1);
databaseTypes.put("ds_0", new H2DatabaseType());
databaseTypes.put("ds_1", new MySQLDatabaseType());
- when(result.getDatabaseTypes()).thenReturn(databaseTypes);
+ when(result.getStorageTypes()).thenReturn(databaseTypes);
return result;
}
}
diff --git a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java
new file mode 100644
index 00000000000..5c80aa6a46a
--- /dev/null
+++ b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.util.reflect;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+import java.lang.reflect.Field;
+import java.util.Objects;
+
+/**
+ * Reflective utility.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectiveUtil {
+
+ /**
+ * Get field value object.
+ *
+ * @param object object
+ * @param fieldName field name
+ * @return object
+ */
+ public static Object getFieldValue(final Object object, final String fieldName) {
+ return getFieldValue(object, getField(object, fieldName));
+ }
+
+ private static Object getFieldValue(final Object object, final Field field) {
+ if (null == object || null == field) {
+ return null;
+ }
+ field.setAccessible(true);
+ Object result = null;
+ try {
+ result = field.get(object);
+ } catch (IllegalAccessException ignored) {
+ }
+ return result;
+ }
+
+ private static Field getField(final Object target, final String fieldName) {
+ Class<?> clazz = target.getClass();
+ while (null != clazz) {
+ try {
+ return clazz.getDeclaredField(fieldName);
+ } catch (final NoSuchFieldException ignored) {
+ }
+ clazz = clazz.getSuperclass();
+ }
+ return null;
+ }
+
+ /**
+ * Set value to specified field.
+ *
+ * @param target target
+ * @param fieldName field name
+ * @param value value
+ */
+ @SneakyThrows(ReflectiveOperationException.class)
+ public static void setProperty(final Object target, final String fieldName, final Object value) {
+ Field field = getField(target, fieldName);
+ Objects.requireNonNull(field);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 15ea089cd52..177bd3b881e 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -53,8 +53,8 @@ public final class DriverExecutor implements AutoCloseable {
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.isHoldTransaction());
regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), connection.getContextManager(), jdbcExecutor);
rawExecutor = new RawExecutor(executorEngine, connection.isHoldTransaction(), metaDataContexts.getMetaData().getProps(), eventBusContext);
- DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType();
- String schemaName = DatabaseTypeEngine.getDefaultSchemaName(databaseType, connection.getDatabaseName());
+ DatabaseType protocolType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
+ String schemaName = DatabaseTypeEngine.getDefaultSchemaName(protocolType, connection.getDatabaseName());
SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
federationExecutor = sqlFederationRule.getSQLFederationExecutor(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(),
metaDataContexts.getShardingSphereData(), jdbcExecutor, eventBusContext);
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 2d2b2a78206..19cb5608f37 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.executor.batch;
import lombok.Getter;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -137,11 +138,11 @@ public final class BatchPreparedStatementExecutor {
*/
public int[] executeBatch(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- JDBCExecutorCallback<int[]> callback = new JDBCExecutorCallback<int[]>(
- metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getDatabaseType(), sqlStatementContext.getSqlStatement(), isExceptionThrown, eventBusContext) {
+ JDBCExecutorCallback<int[]> callback = new JDBCExecutorCallback<int[]>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown, eventBusContext) {
@Override
- protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return statement.executeBatch();
}
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
index 8089c9bff2d..90634148388 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
@@ -18,17 +18,18 @@
package org.apache.shardingsphere.driver.executor.callback;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
import java.util.Optional;
/**
@@ -36,14 +37,15 @@ import java.util.Optional;
*/
public abstract class ExecuteQueryCallback extends JDBCExecutorCallback<QueryResult> {
- protected ExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
- super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+ protected ExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+ final EventBusContext eventBusContext) {
+ super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
}
@Override
- protected final QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected final QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
ResultSet resultSet = executeQuery(sql, statement);
- return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, getDatabaseType());
+ return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, storageType);
}
@Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
index 4b364caace8..d0237f9b333 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
@@ -26,14 +26,16 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
/**
* Prepared statement execute query callback.
*/
public final class PreparedStatementExecuteQueryCallback extends ExecuteQueryCallback {
- public PreparedStatementExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
- super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+ public PreparedStatementExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+ final EventBusContext eventBusContext) {
+ super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
}
@Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
index 126d618b54b..8e03916fc3a 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
@@ -25,14 +25,16 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
/**
* Statement execute query callback.
*/
public final class StatementExecuteQueryCallback extends ExecuteQueryCallback {
- public StatementExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
- super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+ public StatementExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+ final EventBusContext eventBusContext) {
+ super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
}
@Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 091f4e7e313..9eb820c4095 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -44,6 +44,7 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -294,13 +295,14 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(),
- new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
- sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext));
+ new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
+ eventBusContext));
}
private ResultSet executeFederationQuery(final QueryContext queryContext) throws SQLException {
- PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
+ PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
eventBusContext);
SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
return executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
@@ -310,7 +312,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager,
statementOption, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
}
@Override
@@ -357,11 +359,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown,
+ return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown,
eventBusContext) {
@Override
- protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return ((PreparedStatement) statement).executeUpdate();
}
@@ -434,11 +437,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private JDBCExecutorCallback<Boolean> createExecuteCallback() {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown,
+ return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown,
eventBusContext) {
@Override
- protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return ((PreparedStatement) statement).execute();
}
@@ -605,7 +609,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), connection.getConnectionManager(), statementManager, statementOption,
metaDataContexts.getMetaData()
.getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
List<ExecutionUnit> executionUnits = new ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits().size());
for (BatchExecutionUnit each : batchPreparedStatementExecutor.getBatchExecutionUnits()) {
ExecutionUnit executionUnit = each.getExecutionUnit();
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 092c0509470..167dca50ab0 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -214,14 +215,16 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
}
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
cacheStatements(executionGroupContext.getInputGroups());
- StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
- executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
+ StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), executionContext.getSqlStatementContext().getSqlStatement(),
+ SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(), callback);
}
private ResultSet executeFederationQuery(final QueryContext queryContext) throws SQLException {
- StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
- queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
+ StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), queryContext.getSqlStatementContext().getSqlStatement(),
+ SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
return executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
}
@@ -230,7 +233,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager, statementOption,
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
}
@Override
@@ -353,12 +356,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteUpdateCallback updater,
final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
+ JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
eventBusContext) {
@Override
- protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return updater.executeUpdate(sql, statement);
}
@@ -434,11 +437,11 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteCallback executeCallback,
final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown, eventBusContext) {
+ JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
@Override
- protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return executeCallback.execute(sql, statement);
}
diff --git a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
index 8f8102a5d00..d56f359e515 100644
--- a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
+++ b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
@@ -70,7 +70,7 @@ public final class TransactionRule implements GlobalRule, ResourceHeldRule<Shard
for (Entry<String, ShardingSphereDatabase> entry : databases.entrySet()) {
ShardingSphereDatabase database = entry.getValue();
database.getResourceMetaData().getDataSources().forEach((key, value) -> dataSourceMap.put(database.getName() + "." + key, value));
- database.getResourceMetaData().getDatabaseTypes().forEach((key, value) -> databaseTypes.put(database.getName() + "." + key, value));
+ database.getResourceMetaData().getStorageTypes().forEach((key, value) -> databaseTypes.put(database.getName() + "." + key, value));
}
if (dataSourceMap.isEmpty()) {
return new ShardingSphereTransactionManagerEngine();
diff --git a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
index 11c5c171139..78f21235c29 100644
--- a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
+++ b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
@@ -65,7 +65,7 @@ public final class TransactionRuleTest {
Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(2, 1);
databaseTypes.put("ds_0", new PostgreSQLDatabaseType());
databaseTypes.put("ds_1", new OpenGaussDatabaseType());
- when(result.getDatabaseTypes()).thenReturn(databaseTypes);
+ when(result.getStorageTypes()).thenReturn(databaseTypes);
return result;
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 5fda6af76aa..6fd285ca250 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -523,7 +523,7 @@ public final class ContextManager implements AutoCloseable {
private ShardingSphereSchema loadSchema(final String databaseName, final String schemaName, final String dataSourceName) throws SQLException {
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
database.reloadRules(MutableDataNodeRule.class);
- GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(), database.getResourceMetaData().getDatabaseTypes(),
+ GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(), database.getResourceMetaData().getStorageTypes(),
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getDataSources().get(dataSourceName)),
database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
ShardingSphereSchema result = GenericSchemaBuilder.build(material).get(schemaName);
@@ -568,7 +568,7 @@ public final class ContextManager implements AutoCloseable {
private synchronized void reloadTable(final String databaseName, final String schemaName, final String tableName, final Map<String, DataSource> dataSourceMap) throws SQLException {
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
- database.getResourceMetaData().getDatabaseTypes(), dataSourceMap, database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
+ database.getResourceMetaData().getStorageTypes(), dataSourceMap, database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
ShardingSphereSchema schema = GenericSchemaBuilder.build(Collections.singletonList(tableName), material).getOrDefault(schemaName, new ShardingSphereSchema());
if (schema.containsTable(tableName)) {
alterTable(databaseName, schemaName, schema.getTable(tableName));
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index ad22f74dcba..ee7963516e3 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -187,7 +187,7 @@ public final class ProxySQLExecutor {
JDBCBackendStatement statementManager = (JDBCBackendStatement) backendConnection.getConnectionSession().getStatementManager();
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
type, maxConnectionsSizePerQuery, backendConnection, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
- ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
try {
executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 9d9a9f4b22f..b7af3a1268d 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -138,7 +138,7 @@ public final class ReactiveProxySQLExecutor {
VertxBackendStatement statementManager = (VertxBackendStatement) backendConnection.getConnectionSession().getStatementManager();
DriverExecutionPrepareEngine<VertxExecutionUnit, Future<? extends SqlClient>> prepareEngine = new DriverExecutionPrepareEngine<>(
TYPE, maxConnectionsSizePerQuery, backendConnection, statementManager, new VertxExecutionContext(), rules,
- ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
ExecutionGroupContext<VertxExecutionUnit> executionGroupContext;
try {
executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 9842334008b..8edb5996836 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -62,6 +62,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -155,8 +156,8 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
boolean isReturnGeneratedKeys = queryContext.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
- DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
- ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, databaseType,
+ Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
+ ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, storageTypes,
queryContext.getSqlStatementContext().getSqlStatement(), this, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
@@ -168,7 +169,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
JDBCBackendStatement statementManager = (JDBCBackendStatement) backendConnection.getConnectionSession().getStatementManager();
return new DriverExecutionPrepareEngine<>(getDriverType(), maxConnectionsSizePerQuery, backendConnection, statementManager,
new StatementOption(isReturnGeneratedKeys), metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getRuleMetaData().getRules(),
- metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
}
private ResponseHeader processExecuteFederation(final ResultSet resultSet, final MetaDataContexts metaDataContexts) throws SQLException {
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 03576f12ee8..989ccd542c4 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import java.sql.SQLException;
import java.util.List;
+import java.util.Map;
/**
* Proxy JDBC executor.
@@ -68,13 +69,13 @@ public final class ProxyJDBCExecutor {
EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
- DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
+ Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
ExecuteProcessEngine.initializeExecution(queryContext, executionGroupContext, eventBusContext);
SQLStatementContext<?> context = queryContext.getSqlStatementContext();
List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
- ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
+ ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
true),
- ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
+ ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
false));
ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(), eventBusContext);
return result;
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index e1692825862..1a34ae79c2b 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -36,6 +36,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.Map;
import java.util.Optional;
/**
@@ -51,38 +52,38 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
private boolean hasMetaData;
- public ProxyJDBCExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+ public ProxyJDBCExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
final JDBCDatabaseCommunicationEngine databaseCommunicationEngine,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(protocolType, databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+ super(protocolType, storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
this.databaseCommunicationEngine = databaseCommunicationEngine;
this.isReturnGeneratedKeys = isReturnGeneratedKeys;
this.fetchMetaData = fetchMetaData;
}
@Override
- public ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ public ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
if (fetchMetaData && !hasMetaData) {
hasMetaData = true;
- return executeSQL(sql, statement, connectionMode, true);
+ return executeSQL(sql, statement, connectionMode, true, storageType);
}
- return executeSQL(sql, statement, connectionMode, false);
+ return executeSQL(sql, statement, connectionMode, false, storageType);
}
- private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetaData) throws SQLException {
+ private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetaData, final DatabaseType storageType) throws SQLException {
databaseCommunicationEngine.add(statement);
if (execute(sql, statement, isReturnGeneratedKeys)) {
ResultSet resultSet = statement.getResultSet();
databaseCommunicationEngine.add(resultSet);
- return createQueryResult(resultSet, connectionMode);
+ return createQueryResult(resultSet, connectionMode, storageType);
}
return new UpdateResult(statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
}
protected abstract boolean execute(String sql, Statement statement, boolean isReturnGeneratedKeys) throws SQLException;
- private QueryResult createQueryResult(final ResultSet resultSet, final ConnectionMode connectionMode) throws SQLException {
- return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, getDatabaseType());
+ private QueryResult createQueryResult(final ResultSet resultSet, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+ return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, storageType);
}
private long getGeneratedKey(final Statement statement) throws SQLException {
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 75ec1cc1eb4..493b26c99e2 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -27,6 +27,8 @@ import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callb
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import java.util.Map;
+
/**
* Proxy JDBC executor callback factory.
*/
@@ -38,7 +40,7 @@ public final class ProxyJDBCExecutorCallbackFactory {
*
* @param type driver type
* @param protocolType protocol type
- * @param databaseType database type
+ * @param storageTypes storage types
* @param sqlStatement SQL statement
* @param databaseCommunicationEngine database communication engine
* @param isReturnGeneratedKeys is return generated keys or not
@@ -46,14 +48,14 @@ public final class ProxyJDBCExecutorCallbackFactory {
* @param isFetchMetaData is fetch meta data or not
* @return created instance
*/
- public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+ public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown,
final boolean isFetchMetaData) {
if (JDBCDriverType.STATEMENT.equals(type)) {
- return new ProxyStatementExecutorCallback(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+ return new ProxyStatementExecutorCallback(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
}
if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
- return new ProxyPreparedStatementExecutorCallback(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+ return new ProxyPreparedStatementExecutorCallback(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
}
throw new UnsupportedSQLOperationException(String.format("Unsupported driver type: `%s`", type));
}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 9867aca81c4..3d028a060d2 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -25,16 +25,17 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
/**
* Prepared statement executor callback for proxy.
*/
public final class ProxyPreparedStatementExecutorCallback extends ProxyJDBCExecutorCallback {
- public ProxyPreparedStatementExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+ public ProxyPreparedStatementExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index dac1fd231ae..21e3209eaa0 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -24,16 +24,17 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Map;
/**
* Statement executor callback for proxy.
*/
public final class ProxyStatementExecutorCallback extends ProxyJDBCExecutorCallback {
- public ProxyStatementExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+ public ProxyStatementExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys,
final boolean isExceptionThrown, final boolean fetchMetaData) {
- super(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+ super(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
}
@Override
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
index a1a60bcc77d..05e2db78d4b 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
@@ -71,6 +71,7 @@ import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -139,21 +140,23 @@ public final class PreviewHandler extends SQLRULBackendHandler<PreviewStatement>
boolean isReturnGeneratedKeys = sqlStatement instanceof MySQLInsertStatement;
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
SQLFederationExecutorContext context = new SQLFederationExecutorContext(true, queryContext, metaDataContexts.getMetaData().getDatabases());
- DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getDatabaseType();
- String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
+ ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(getDatabaseName());
+ String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(database.getProtocolType(), databaseName));
EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
SQLFederationExecutor sqlFederationExecutor = sqlFederationRule.getSQLFederationExecutor(databaseName, schemaName,
metaDataContexts.getMetaData(), metaDataContexts.getShardingSphereData(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), false), eventBusContext);
- sqlFederationExecutor.executeQuery(prepareEngine, createPreviewFederationCallback(sqlStatement, databaseType, eventBusContext), context);
+ sqlFederationExecutor.executeQuery(prepareEngine, createPreviewFederationCallback(database.getProtocolType(), database.getResourceMetaData().getStorageTypes(), sqlStatement, eventBusContext),
+ context);
return context.getExecutionUnits();
}
- private JDBCExecutorCallback<ExecuteResult> createPreviewFederationCallback(final SQLStatement sqlStatement, final DatabaseType databaseType, final EventBusContext eventBusContext) {
- return new JDBCExecutorCallback<ExecuteResult>(databaseType, sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext) {
+ private JDBCExecutorCallback<ExecuteResult> createPreviewFederationCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
+ final EventBusContext eventBusContext) {
+ return new JDBCExecutorCallback<ExecuteResult>(protocolType, storageTypes, sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext) {
@Override
- protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
return new JDBCStreamQueryResult(statement.executeQuery(sql));
}
@@ -169,7 +172,7 @@ public final class PreviewHandler extends SQLRULBackendHandler<PreviewStatement>
return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, (JDBCBackendConnection) getConnectionSession().getBackendConnection(),
(JDBCBackendStatement) getConnectionSession().getStatementManager(), new StatementOption(isReturnGeneratedKeys),
metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getRuleMetaData().getRules(),
- metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getStorageTypes());
}
private String getDatabaseName() {
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index f2e83d9e3d6..d70929f4d08 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -140,7 +140,7 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, metaDataContexts.getMetaData().getProps()
.<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), (JDBCBackendConnection) connectionSession.getBackendConnection(),
(JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules,
- metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), samplingExecutionUnit());
for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit each : eachGroup.getInputs()) {
@@ -167,8 +167,8 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
private UpdateResponseHeader executeBatchedStatements(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseType();
- JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(databaseType, sqlStatementSample, isExceptionThrown);
+ Map<String, DatabaseType> storageTypes = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes();
+ JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(storageTypes, sqlStatementSample, isExceptionThrown);
List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback);
int updated = 0;
for (int[] eachResult : executeResults) {
@@ -182,12 +182,12 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
private static class BatchedJDBCExecutorCallback extends JDBCExecutorCallback<int[]> {
- BatchedJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
- super(DatabaseTypeFactory.getInstance("MySQL"), databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+ BatchedJDBCExecutorCallback(final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
+ super(DatabaseTypeFactory.getInstance("MySQL"), storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
}
@Override
- protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
try {
return statement.executeBatch();
} finally {
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
index 3102d341f27..0f82a29ffd2 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
@@ -110,7 +110,7 @@ public final class MySQLComQueryPacketExecutorTest {
try (MockedStatic<ProxyContext> mockedStatic = mockStatic(ProxyContext.class)) {
ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
mockedStatic.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
- when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getResourceMetaData().getDatabaseTypes())
+ when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getResourceMetaData().getStorageTypes())
.thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getProtocolType()).thenReturn(new MySQLDatabaseType());
ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
index 460842ff5ec..afc600bce4a 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
@@ -75,8 +75,8 @@ public final class MySQLMultiStatementsHandlerTest {
@Test
public void assertExecute() throws SQLException {
- final String sql = "update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3";
- when(connectionSession.getDatabaseName()).thenReturn("");
+ String sql = "update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3";
+ when(connectionSession.getDatabaseName()).thenReturn("db");
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
@@ -89,9 +89,11 @@ public final class MySQLMultiStatementsHandlerTest {
MySQLUpdateStatement expectedStatement = mock(MySQLUpdateStatement.class);
try (MockedStatic<ProxyContext> mockedStatic = mockStatic(ProxyContext.class)) {
mockedStatic.when(ProxyContext::getInstance).thenReturn(proxyContext);
- when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("").getResourceMetaData().getDatabaseTypes())
+ when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getAllInstanceDataSourceNames())
+ .thenReturn(Collections.singletonList("ds_0"));
+ when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getStorageTypes())
.thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
- when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("").getProtocolType()).thenReturn(new MySQLDatabaseType());
+ when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getProtocolType()).thenReturn(new MySQLDatabaseType());
ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
when(globalRuleMetaData.getSingleRule(SQLParserRule.class)).thenReturn(new SQLParserRule(new DefaultSQLParserRuleConfigurationBuilder().build()));
diff --git a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index b9cd7637639..ccb72e2f337 100644
--- a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++ b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -79,7 +80,8 @@ public final class OpenGaussComBatchBindExecutorTest extends ProxyContextRestore
ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
- when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+ when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new OpenGaussDatabaseType()));
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
when(globalRuleMetaData.getSingleRule(SQLTranslatorRule.class)).thenReturn(new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()));
String statement = "S_1";
@@ -91,7 +93,7 @@ public final class OpenGaussComBatchBindExecutorTest extends ProxyContextRestore
when(connectionSession.getConnectionId()).thenReturn(1);
JDBCBackendConnection backendConnection = mock(JDBCBackendConnection.class);
Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
- when(connection.getMetaData().getURL()).thenReturn("");
+ when(connection.getMetaData().getURL()).thenReturn("jdbc:opengauss://127.0.0.1/db");
when(backendConnection.getConnections(nullable(String.class), anyInt(), any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(preparedStatement.getConnection()).thenReturn(connection);
diff --git a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 1b0872c6da6..7cc9ab212a8 100644
--- a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++ b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -148,7 +148,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
(JDBCBackendConnection) connectionSession.getBackendConnection(), (JDBCBackendStatement) connectionSession.getStatementManager(),
- new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+ new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), executionUnitParameters.keySet());
for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit each : eachGroup.getInputs()) {
@@ -176,9 +176,9 @@ public final class PostgreSQLBatchedStatementsExecutor {
private int executeBatchedPreparedStatements() throws SQLException {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
- DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
+ Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
DatabaseType protocolType = database.getProtocolType();
- JDBCExecutorCallback<int[]> callback = new BatchedStatementsJDBCExecutorCallback(protocolType, databaseType, preparedStatement.getSqlStatement(), isExceptionThrown);
+ JDBCExecutorCallback<int[]> callback = new BatchedStatementsJDBCExecutorCallback(protocolType, storageTypes, preparedStatement.getSqlStatement(), isExceptionThrown);
List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback);
int result = 0;
for (int[] eachResult : executeResults) {
@@ -191,12 +191,12 @@ public final class PostgreSQLBatchedStatementsExecutor {
private static class BatchedStatementsJDBCExecutorCallback extends JDBCExecutorCallback<int[]> {
- BatchedStatementsJDBCExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
- super(protocolType, databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+ BatchedStatementsJDBCExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
+ super(protocolType, storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
}
@Override
- protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+ protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
try {
return statement.executeBatch();
} finally {
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
index 3b750721f8b..cb689b35ccd 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -93,14 +94,15 @@ public final class PostgreSQLAggregatedBatchedStatementsCommandExecutorTest exte
when(connectionSession.getDatabaseName()).thenReturn("db");
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new ServerPreparedStatementRegistry());
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
- when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+ when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(STATEMENT_ID,
new PostgreSQLServerPreparedStatement(SQL, SQL_PARSER_ENGINE.parse(SQL, false), null, Collections.singletonList(PostgreSQLColumnType.POSTGRESQL_TYPE_INT4)));
when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
JDBCBackendConnection backendConnection = mock(JDBCBackendConnection.class);
Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
- when(connection.getMetaData().getURL()).thenReturn("");
+ when(connection.getMetaData().getURL()).thenReturn("jdbc:postgresql://127.0.0.1/db");
when(backendConnection.getConnections(nullable(String.class), anyInt(), any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(preparedStatement.getConnection()).thenReturn(connection);
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
index 0cf134c4167..b2cfe79f7da 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLTypeUnspecifiedSQLParameter;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -89,7 +90,8 @@ public final class PostgreSQLBatchedStatementsExecutorTest extends ProxyContextR
when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1);
when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.SQL_SHOW)).thenReturn(false);
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
- when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+ when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
+ when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
index 7993dca5583..20d372efc30 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
@@ -128,7 +128,7 @@ public final class PostgreSQLComDescribeExecutorTest extends ProxyContextRestore
ShardingSphereTable table = new ShardingSphereTable(TABLE_NAME, columnMetaData, Collections.emptyList(), Collections.emptyList());
when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getSchema("public").getTable(TABLE_NAME)).thenReturn(table);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getProtocolType()).thenReturn(new PostgreSQLDatabaseType());
- when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getResourceMetaData().getDatabaseTypes())
+ when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getResourceMetaData().getStorageTypes())
.thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
when(contextManager.getMetaDataContexts().getMetaData().containsDatabase(DATABASE_NAME)).thenReturn(true);
}
diff --git a/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java b/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
index f8465d73e1c..eb7ff52a83e 100644
--- a/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
+++ b/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
@@ -109,8 +109,8 @@ public abstract class AbstractSQLRewriterParameterizedTest {
mockDataSource(databaseConfig.getDataSources());
ShardingSphereResourceMetaData resourceMetaData = mock(ShardingSphereResourceMetaData.class);
DatabaseType databaseType = DatabaseTypeFactory.getInstance(getTestParameters().getDatabaseType());
- Map<String, DatabaseType> databaseTypes = createDatabaseTypes(databaseConfig, databaseType);
- when(resourceMetaData.getDatabaseTypes()).thenReturn(databaseTypes);
+ Map<String, DatabaseType> storageTypes = createStorageTypes(databaseConfig, databaseType);
+ when(resourceMetaData.getStorageTypes()).thenReturn(storageTypes);
String schemaName = DatabaseTypeEngine.getDefaultSchemaName(databaseType, DefaultDatabase.LOGIC_NAME);
Map<String, ShardingSphereSchema> schemas = mockSchemas(schemaName);
Collection<ShardingSphereRule> databaseRules = DatabaseRulesBuilder.build(DefaultDatabase.LOGIC_NAME, databaseConfig, mock(InstanceContext.class));
@@ -141,7 +141,7 @@ public abstract class AbstractSQLRewriterParameterizedTest {
: (((RouteSQLRewriteResult) sqlRewriteResult).getSqlRewriteUnits()).values();
}
- private static Map<String, DatabaseType> createDatabaseTypes(final DatabaseConfiguration databaseConfig, final DatabaseType databaseType) {
+ private static Map<String, DatabaseType> createStorageTypes(final DatabaseConfiguration databaseConfig, final DatabaseType databaseType) {
Map<String, DatabaseType> result = new LinkedHashMap<>(databaseConfig.getDataSources().size(), 1);
for (Entry<String, DataSource> entry : databaseConfig.getDataSources().entrySet()) {
result.put(entry.getKey(), databaseType);