You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/25 10:02:28 UTC

[shardingsphere] branch master updated: Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a21698577d7 Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)
a21698577d7 is described below

commit a21698577d77733e748801c6723443acd510ed79
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Oct 25 18:02:21 2022 +0800

    Support heterogeneous datasource for jdbc, proxy adapter and modify agent logic (#21733)
    
    * Support heterogeneous datasource for merge engine and jdbc, proxy adapter
    
    * fix agent unit test
    
    * fix proxy unit test
---
 .../jaeger/advice/JDBCExecutorCallbackAdvice.java  |  8 ++-
 .../advice/JDBCExecutorCallbackAdvice.java         |  8 ++-
 .../AbstractJDBCExecutorCallbackAdviceTest.java    | 12 +++-
 .../zipkin/advice/JDBCExecutorCallbackAdvice.java  |  8 ++-
 .../sharding/merge/ShardingResultMergerEngine.java |  4 +-
 .../merge/dql/ShardingDQLResultMerger.java         |  4 +-
 .../infra/database/type/DatabaseTypeEngine.java    |  6 +-
 .../resource/ShardingSphereResourceMetaData.java   |  6 +-
 .../type/AlterTableStatementSchemaRefresher.java   |  2 +-
 .../type/AlterViewStatementSchemaRefresher.java    |  2 +-
 .../type/CreateTableStatementSchemaRefresher.java  |  2 +-
 .../type/CreateViewStatementSchemaRefresher.java   |  2 +-
 .../type/RenameTableStatementSchemaRefresher.java  |  2 +-
 .../RenameTableStatementSchemaRefresherTest.java   |  2 +-
 .../engine/driver/jdbc/JDBCExecutorCallback.java   | 23 +++---
 .../engine/jdbc/JDBCExecutorCallbackTest.java      | 60 ++++++++--------
 .../shardingsphere/infra/merge/MergeEngine.java    |  3 +-
 .../merge/engine/merger/ResultMergerEngine.java    |  4 +-
 .../fixture/merger/ResultMergerEngineFixture.java  |  2 +-
 .../infra/rewrite/SQLRewriteEntry.java             |  2 +-
 .../infra/rewrite/SQLRewriteEntryTest.java         |  2 +-
 .../infra/util/reflect/ReflectiveUtil.java         | 83 ++++++++++++++++++++++
 .../driver/executor/DriverExecutor.java            |  4 +-
 .../batch/BatchPreparedStatementExecutor.java      |  7 +-
 .../executor/callback/ExecuteQueryCallback.java    | 12 ++--
 .../PreparedStatementExecuteQueryCallback.java     |  6 +-
 .../impl/StatementExecuteQueryCallback.java        |  6 +-
 .../statement/ShardingSpherePreparedStatement.java | 24 ++++---
 .../core/statement/ShardingSphereStatement.java    | 25 ++++---
 .../transaction/rule/TransactionRule.java          |  2 +-
 .../transaction/rule/TransactionRuleTest.java      |  2 +-
 .../mode/manager/ContextManager.java               |  4 +-
 .../backend/communication/ProxySQLExecutor.java    |  2 +-
 .../communication/ReactiveProxySQLExecutor.java    |  2 +-
 .../jdbc/JDBCDatabaseCommunicationEngine.java      |  7 +-
 .../jdbc/executor/ProxyJDBCExecutor.java           |  7 +-
 .../callback/ProxyJDBCExecutorCallback.java        | 19 ++---
 .../callback/ProxyJDBCExecutorCallbackFactory.java | 10 +--
 .../ProxyPreparedStatementExecutorCallback.java    |  5 +-
 .../impl/ProxyStatementExecutorCallback.java       |  5 +-
 .../handler/distsql/rul/sql/PreviewHandler.java    | 17 +++--
 .../text/query/MySQLMultiStatementsHandler.java    | 12 ++--
 .../query/MySQLComQueryPacketExecutorTest.java     |  2 +-
 .../query/MySQLMultiStatementsHandlerTest.java     | 10 +--
 .../bind/OpenGaussComBatchBindExecutorTest.java    |  6 +-
 .../PostgreSQLBatchedStatementsExecutor.java       | 12 ++--
 ...egatedBatchedStatementsCommandExecutorTest.java |  6 +-
 .../PostgreSQLBatchedStatementsExecutorTest.java   |  4 +-
 .../PostgreSQLComDescribeExecutorTest.java         |  2 +-
 .../AbstractSQLRewriterParameterizedTest.java      |  6 +-
 50 files changed, 303 insertions(+), 170 deletions(-)

diff --git a/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
index 65f75b8bb64..f23a1f39d2c 100644
--- a/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/jaeger/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/jaeger/advice/JDBCExecutorCallbackAdvice.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
 import org.apache.shardingsphere.agent.plugin.tracing.jaeger.constant.JaegerConstants;
 import org.apache.shardingsphere.agent.plugin.tracing.jaeger.span.JaegerErrorSpan;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
 
 import java.lang.reflect.Method;
 import java.sql.DatabaseMetaData;
@@ -54,9 +56,11 @@ public final class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdv
             builder = builder.asChildOf(root);
         }
         JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
-        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+        Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
         getMetaDataMethod.setAccessible(true);
-        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+                new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
         builder.withTag(Tags.COMPONENT.getKey(), JaegerConstants.COMPONENT_NAME)
                 .withTag(Tags.DB_TYPE.getKey(), JaegerConstants.DB_TYPE_VALUE)
                 .withTag(Tags.DB_INSTANCE.getKey(), executionUnit.getExecutionUnit().getDataSourceName())
diff --git a/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
index 4935ef128e2..89fdbfaa0ab 100644
--- a/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/opentelemetry/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/opentelemetry/advice/JDBCExecutorCallbackAdvice.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.agent.api.advice.InstanceMethodAroundAdvice;
 import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
 import org.apache.shardingsphere.agent.plugin.tracing.opentelemetry.constant.OpenTelemetryConstants;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
 
 import java.lang.reflect.Method;
 import java.sql.DatabaseMetaData;
@@ -56,9 +58,11 @@ public class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdvice {
         spanBuilder.setAttribute(OpenTelemetryConstants.COMPONENT, OpenTelemetryConstants.COMPONENT_NAME);
         spanBuilder.setAttribute(OpenTelemetryConstants.DB_TYPE, OpenTelemetryConstants.DB_TYPE_VALUE);
         JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
-        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+        Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
         getMetaDataMethod.setAccessible(true);
-        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+                new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
         spanBuilder.setAttribute(OpenTelemetryConstants.DB_INSTANCE, executionUnit.getExecutionUnit().getDataSourceName())
                 .setAttribute(OpenTelemetryConstants.PEER_HOSTNAME, metaData.getHostname())
                 .setAttribute(OpenTelemetryConstants.PEER_PORT, String.valueOf(metaData.getPort()))
diff --git a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
index f3520289734..a6f0aac1734 100644
--- a/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
+++ b/agent/plugins/tracing/test/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/advice/AbstractJDBCExecutorCallbackAdviceTest.java
@@ -23,10 +23,13 @@ import org.apache.shardingsphere.agent.api.advice.AdviceTargetObject;
 import org.apache.shardingsphere.agent.plugin.tracing.AgentRunner;
 import org.apache.shardingsphere.agent.plugin.tracing.MockDataSourceMetaData;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.FieldReader;
 
@@ -36,6 +39,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.mockito.Mockito.mock;
@@ -78,8 +82,12 @@ public abstract class AbstractJDBCExecutorCallbackAdviceTest implements AdviceTe
                     return invocation.callRealMethod();
             }
         });
-        Map<String, DataSourceMetaData> map = (Map<String, DataSourceMetaData>) new FieldReader(mock, JDBCExecutorCallback.class.getDeclaredField("CACHED_DATASOURCE_METADATA")).read();
-        map.put("mock_url", new MockDataSourceMetaData());
+        Map<String, DataSourceMetaData> cachedDatasourceMetadata =
+                (Map<String, DataSourceMetaData>) new FieldReader(mock, JDBCExecutorCallback.class.getDeclaredField("CACHED_DATASOURCE_METADATA")).read();
+        cachedDatasourceMetadata.put("mock_url", new MockDataSourceMetaData());
+        Map<String, DatabaseType> storageTypes = new LinkedHashMap<>(1, 1);
+        storageTypes.put("mock.db", new MySQLDatabaseType());
+        ReflectiveUtil.setProperty(mock, "storageTypes", storageTypes);
         targetObject = (AdviceTargetObject) mock;
     }
 }
diff --git a/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java b/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
index 6a3d78c374f..7d0ebf320a1 100644
--- a/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
+++ b/agent/plugins/tracing/zipkin/src/main/java/org/apache/shardingsphere/agent/plugin/tracing/zipkin/advice/JDBCExecutorCallbackAdvice.java
@@ -25,8 +25,10 @@ import org.apache.shardingsphere.agent.api.advice.InstanceMethodAroundAdvice;
 import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
 import org.apache.shardingsphere.agent.plugin.tracing.zipkin.constant.ZipkinConstants;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.util.reflect.ReflectiveUtil;
 
 import java.lang.reflect.Method;
 import java.sql.DatabaseMetaData;
@@ -49,9 +51,11 @@ public final class JDBCExecutorCallbackAdvice implements InstanceMethodAroundAdv
         span.tag(ZipkinConstants.Tags.COMPONENT, ZipkinConstants.COMPONENT_NAME);
         span.tag(ZipkinConstants.Tags.DB_TYPE, ZipkinConstants.DB_TYPE_VALUE);
         JDBCExecutionUnit executionUnit = (JDBCExecutionUnit) args[0];
-        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class);
+        Map<String, DatabaseType> storageTypes = (Map<String, DatabaseType>) ReflectiveUtil.getFieldValue(target, "storageTypes");
+        Method getMetaDataMethod = JDBCExecutorCallback.class.getDeclaredMethod("getDataSourceMetaData", DatabaseMetaData.class, DatabaseType.class);
         getMetaDataMethod.setAccessible(true);
-        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target, new Object[]{executionUnit.getStorageResource().getConnection().getMetaData()});
+        DataSourceMetaData metaData = (DataSourceMetaData) getMetaDataMethod.invoke(target,
+                new Object[]{executionUnit.getStorageResource().getConnection().getMetaData(), storageTypes.get(executionUnit.getExecutionUnit().getDataSourceName())});
         span.tag(ZipkinConstants.Tags.DB_INSTANCE, executionUnit.getExecutionUnit().getDataSourceName());
         span.tag(ZipkinConstants.Tags.PEER_HOSTNAME, metaData.getHostname());
         span.tag(ZipkinConstants.Tags.PEER_PORT, String.valueOf(metaData.getPort()));
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
index 4a311feabe0..c1ed83869eb 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ShardingResultMergerEngine.java
@@ -38,10 +38,10 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
 public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
     
     @Override
-    public ResultMerger newInstance(final String databaseName, final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties props,
+    public ResultMerger newInstance(final String databaseName, final DatabaseType protocolType, final ShardingRule shardingRule, final ConfigurationProperties props,
                                     final SQLStatementContext<?> sqlStatementContext) {
         if (sqlStatementContext instanceof SelectStatementContext) {
-            return new ShardingDQLResultMerger(databaseType);
+            return new ShardingDQLResultMerger(protocolType);
         }
         if (sqlStatementContext.getSqlStatement() instanceof DDLStatement) {
             return new ShardingDDLResultMerger();
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
index 710e5095e3b..0d9fb296e00 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/dql/ShardingDQLResultMerger.java
@@ -52,7 +52,7 @@ import java.util.TreeMap;
 @RequiredArgsConstructor
 public final class ShardingDQLResultMerger implements ResultMerger {
     
-    private final DatabaseType databaseType;
+    private final DatabaseType protocolType;
     
     @Override
     public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext,
@@ -129,7 +129,7 @@ public final class ShardingDQLResultMerger implements ResultMerger {
         if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
             return mergedResult;
         }
-        String trunkDatabaseName = DatabaseTypeEngine.getTrunkDatabaseType(databaseType.getType()).getType();
+        String trunkDatabaseName = DatabaseTypeEngine.getTrunkDatabaseType(protocolType.getType()).getType();
         if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName) || "openGauss".equals(trunkDatabaseName)) {
             return new LimitDecoratorMergedResult(mergedResult, paginationContext);
         }
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
index cddf7464658..fb381eade72 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/type/DatabaseTypeEngine.java
@@ -156,12 +156,12 @@ public final class DatabaseTypeEngine {
     /**
      * Get default schema name.
      * 
-     * @param databaseType database type
+     * @param protocolType protocol type
      * @param databaseName database name
      * @return default schema name
      */
-    public static String getDefaultSchemaName(final DatabaseType databaseType, final String databaseName) {
-        return databaseType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema() : databaseName.toLowerCase();
+    public static String getDefaultSchemaName(final DatabaseType protocolType, final String databaseName) {
+        return protocolType instanceof SchemaSupportedDatabaseType ? ((SchemaSupportedDatabaseType) protocolType).getDefaultSchema() : databaseName.toLowerCase();
     }
     
     /**
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
index da2c19c3c25..0c1cdb2b95a 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/resource/ShardingSphereResourceMetaData.java
@@ -43,7 +43,7 @@ public final class ShardingSphereResourceMetaData {
     
     private final Map<String, DataSource> dataSources;
     
-    private final Map<String, DatabaseType> databaseTypes;
+    private final Map<String, DatabaseType> storageTypes;
     
     // TODO remove databaseType when all scenarios have been replaced
     private final DatabaseType databaseType;
@@ -55,11 +55,11 @@ public final class ShardingSphereResourceMetaData {
         this.dataSources = dataSources;
         Map<String, DataSource> enabledDataSources = DataSourceStateManager.getInstance().getEnabledDataSourceMap(databaseName, dataSources);
         databaseType = getDatabaseType(enabledDataSources);
-        databaseTypes = createDatabaseTypes(enabledDataSources);
+        storageTypes = createStorageTypes(enabledDataSources);
         dataSourceMetaDataMap = createDataSourceMetaDataMap(dataSources);
     }
     
-    private Map<String, DatabaseType> createDatabaseTypes(final Map<String, DataSource> dataSources) {
+    private Map<String, DatabaseType> createStorageTypes(final Map<String, DataSource> dataSources) {
         Map<String, DatabaseType> result = new LinkedHashMap<>(dataSources.size(), 1);
         for (Entry<String, DataSource> entry : dataSources.entrySet()) {
             result.put(entry.getKey(), getDatabaseType(Collections.singletonMap(entry.getKey(), entry.getValue())));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
index f5c629ba30f..e2b370f5aa5 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterTableStatementSchemaRefresher.java
@@ -70,7 +70,7 @@ public final class AlterTableStatementSchemaRefresher implements MetaDataRefresh
             database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
         }
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+                database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
         Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
         Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
         actualTableMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(tableName, optional));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
index 8096eb01e48..3d55e8dcc7b 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/AlterViewStatementSchemaRefresher.java
@@ -81,7 +81,7 @@ public final class AlterViewStatementSchemaRefresher implements MetaDataRefreshe
             database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, viewName));
         }
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+                database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
         Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(viewName), material);
         Optional<ShardingSphereTable> actualViewMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(viewName));
         actualViewMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(viewName, optional));
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
index 831b5ec2b57..c984f626771 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateTableStatementSchemaRefresher.java
@@ -49,7 +49,7 @@ public final class CreateTableStatementSchemaRefresher implements MetaDataRefres
             database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
         }
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+                database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
         Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
         Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
         if (actualTableMetaData.isPresent()) {
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
index 21b5e586ec6..66f2744e5ca 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateViewStatementSchemaRefresher.java
@@ -50,7 +50,7 @@ public final class CreateViewStatementSchemaRefresher implements MetaDataRefresh
             database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, viewName));
         }
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+                database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
         Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(viewName), material);
         Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(viewName));
         if (actualTableMetaData.isPresent()) {
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
index aebc85c1923..bd30608e490 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresher.java
@@ -68,7 +68,7 @@ public final class RenameTableStatementSchemaRefresher implements MetaDataRefres
             database.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(logicDataSourceNames.iterator().next(), schemaName, tableName));
         }
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
+                database.getResourceMetaData().getStorageTypes(), database.getResourceMetaData().getDataSources(), database.getRuleMetaData().getRules(), props, schemaName);
         Map<String, ShardingSphereSchema> schemaMap = GenericSchemaBuilder.build(Collections.singletonList(tableName), material);
         Optional<ShardingSphereTable> actualTableMetaData = Optional.ofNullable(schemaMap.get(schemaName)).map(optional -> optional.getTable(tableName));
         actualTableMetaData.ifPresent(optional -> database.getSchema(schemaName).putTable(tableName, optional));
diff --git a/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java b/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
index 345268c7a63..722f424fab9 100644
--- a/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
+++ b/infra/context/src/test/java/org/apache/shardingsphere/infra/context/refresher/type/RenameTableStatementSchemaRefresherTest.java
@@ -82,7 +82,7 @@ public final class RenameTableStatementSchemaRefresherTest {
     private ShardingSphereResourceMetaData mockShardingSphereResourceMetaData() {
         ShardingSphereResourceMetaData result = mock(ShardingSphereResourceMetaData.class);
         when(result.getDataSources()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new MockedDataSource()));
-        when(result.getDatabaseTypes()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new SQL92DatabaseType()));
+        when(result.getStorageTypes()).thenReturn(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, new SQL92DatabaseType()));
         return result;
     }
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index b030381c864..b74af5500a6 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -17,11 +17,9 @@
 
 package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -31,6 +29,7 @@ import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
 import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
 import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.DatabaseMetaData;
@@ -54,8 +53,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
     
     private final DatabaseType protocolType;
     
-    @Getter
-    private final DatabaseType databaseType;
+    private final Map<String, DatabaseType> storageTypes;
     
     private final SQLStatement sqlStatement;
     
@@ -63,10 +61,6 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
     
     private final EventBusContext eventBusContext;
     
-    public JDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
-        this(databaseType, databaseType, sqlStatement, isExceptionThrown, eventBusContext);
-    }
-    
     @Override
     public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
         // TODO It is better to judge whether need sane result before execute, can avoid exception thrown
@@ -87,17 +81,18 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
      */
     private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
         SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
-        DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData());
+        DatabaseType storageType = storageTypes.get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName());
+        DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(jdbcExecutionUnit.getStorageResource().getConnection().getMetaData(), storageType);
         SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
         try {
             SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
             sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
-            T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
+            T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
             sqlExecutionHook.finishSuccess();
             finishReport(dataMap, jdbcExecutionUnit);
             return result;
         } catch (final SQLException ex) {
-            if (!databaseType.equals(protocolType)) {
+            if (!storageType.equals(protocolType)) {
                 Optional<T> saneResult = getSaneResult(sqlStatement, ex);
                 if (saneResult.isPresent()) {
                     return isTrunkThread ? saneResult.get() : null;
@@ -109,12 +104,12 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
         }
     }
     
-    private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData databaseMetaData) throws SQLException {
+    private DataSourceMetaData getDataSourceMetaData(final DatabaseMetaData databaseMetaData, final DatabaseType storageType) throws SQLException {
         String url = databaseMetaData.getURL();
         if (CACHED_DATASOURCE_METADATA.containsKey(url)) {
             return CACHED_DATASOURCE_METADATA.get(url);
         }
-        DataSourceMetaData result = databaseType.getDataSourceMetaData(url, databaseMetaData.getUserName());
+        DataSourceMetaData result = storageType.getDataSourceMetaData(url, databaseMetaData.getUserName());
         CACHED_DATASOURCE_METADATA.put(url, result);
         return result;
     }
@@ -125,7 +120,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx
         }
     }
     
-    protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode) throws SQLException;
+    protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
     
     protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
 }
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index a67a3883e55..4f44967d0e7 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.infra.executor.sql.execute.engine.jdbc;
 
 import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -77,11 +78,12 @@ public final class JDBCExecutorCallbackTest {
     @SuppressWarnings("unchecked")
     @Test
     public void assertExecute() throws SQLException, NoSuchFieldException, IllegalAccessException {
-        JDBCExecutorCallback<?> jdbcExecutorCallback = new JDBCExecutorCallback<Integer>(DatabaseTypeFactory.getInstance("MySQL"), mock(SelectStatement.class), true,
+        DatabaseType databaseType = DatabaseTypeFactory.getInstance("MySQL");
+        JDBCExecutorCallback<?> jdbcExecutorCallback = new JDBCExecutorCallback<Integer>(databaseType, Collections.singletonMap("ds", databaseType), mock(SelectStatement.class), true,
                 new EventBusContext()) {
             
             @Override
-            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return ((PreparedStatement) statement).executeUpdate();
             }
             
@@ -102,38 +104,40 @@ public final class JDBCExecutorCallbackTest {
     @Test
     public void assertExecuteFailedAndProtocolTypeDifferentWithDatabaseType() throws SQLException {
         Object saneResult = new Object();
-        JDBCExecutorCallback<Object> callback = new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), DatabaseTypeFactory.getInstance("PostgreSQL"),
-                mock(SelectStatement.class), true, new EventBusContext()) {
-            
-            @Override
-            protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
-                throw new SQLException();
-            }
-            
-            @Override
-            protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
-                return Optional.of(saneResult);
-            }
-        };
+        JDBCExecutorCallback<Object> callback =
+                new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonMap("ds", DatabaseTypeFactory.getInstance("PostgreSQL")),
+                        mock(SelectStatement.class), true, new EventBusContext()) {
+                    
+                    @Override
+                    protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+                        throw new SQLException();
+                    }
+                    
+                    @Override
+                    protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
+                        return Optional.of(saneResult);
+                    }
+                };
         assertThat(callback.execute(units, true, Collections.emptyMap()), is(Collections.singletonList(saneResult)));
         assertThat(callback.execute(units, false, Collections.emptyMap()), is(Collections.emptyList()));
     }
     
     @Test(expected = SQLException.class)
     public void assertExecuteSQLExceptionOccurredAndProtocolTypeSameAsDatabaseType() throws SQLException {
-        JDBCExecutorCallback<Object> callback = new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), DatabaseTypeFactory.getInstance("PostgreSQL"),
-                mock(SelectStatement.class), true, new EventBusContext()) {
-            
-            @Override
-            protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
-                throw new SQLException();
-            }
-            
-            @Override
-            protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
-                return Optional.empty();
-            }
-        };
+        JDBCExecutorCallback<Object> callback =
+                new JDBCExecutorCallback<Object>(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonMap("ds", DatabaseTypeFactory.getInstance("PostgreSQL")),
+                        mock(SelectStatement.class), true, new EventBusContext()) {
+                    
+                    @Override
+                    protected Object executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+                        throw new SQLException();
+                    }
+                    
+                    @Override
+                    protected Optional<Object> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
+                        return Optional.empty();
+                    }
+                };
         callback.execute(units, true, Collections.emptyMap());
     }
 }
diff --git a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
index bfe6898661d..6d188f20e24 100644
--- a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
+++ b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/MergeEngine.java
@@ -77,8 +77,7 @@ public final class MergeEngine {
     private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
             if (entry.getValue() instanceof ResultMergerEngine) {
-                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(
-                        database.getName(), database.getResourceMetaData().getDatabaseType(), entry.getKey(), props, sqlStatementContext);
+                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(database.getName(), database.getProtocolType(), entry.getKey(), props, sqlStatementContext);
                 return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, database, connectionContext));
             }
         }
diff --git a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
index b397ba71918..2ed1452ddff 100644
--- a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
+++ b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/engine/merger/ResultMergerEngine.java
@@ -34,11 +34,11 @@ public interface ResultMergerEngine<T extends ShardingSphereRule> extends Result
      * Create new instance of result merger engine.
      *
      * @param databaseName database name
-     * @param databaseType database type
+     * @param protocolType protocol type
      * @param rule rule
      * @param props ShardingSphere properties
      * @param sqlStatementContext SQL statement context
      * @return created instance
      */
-    ResultMerger newInstance(String databaseName, DatabaseType databaseType, T rule, ConfigurationProperties props, SQLStatementContext<?> sqlStatementContext);
+    ResultMerger newInstance(String databaseName, DatabaseType protocolType, T rule, ConfigurationProperties props, SQLStatementContext<?> sqlStatementContext);
 }
diff --git a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
index e42d55ae683..953a4f79bdf 100644
--- a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
+++ b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/fixture/merger/ResultMergerEngineFixture.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 public final class ResultMergerEngineFixture implements ResultMergerEngine<MergerRuleFixture> {
     
     @Override
-    public ResultMerger newInstance(final String databaseName, final DatabaseType databaseType, final MergerRuleFixture rule, final ConfigurationProperties props,
+    public ResultMerger newInstance(final String databaseName, final DatabaseType protocolType, final MergerRuleFixture rule, final ConfigurationProperties props,
                                     final SQLStatementContext<?> sqlStatementContext) {
         return new ResultMergerFixture();
     }
diff --git a/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java b/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
index dfbd06574ac..d0f9b49b78e 100644
--- a/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
+++ b/infra/rewrite/src/main/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
@@ -73,7 +73,7 @@ public final class SQLRewriteEntry {
         SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, parameters, sqlStatementContext, routeContext, connectionContext);
         SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);
         DatabaseType protocolType = database.getProtocolType();
-        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getDatabaseTypes();
+        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
         return routeContext.getRouteUnits().isEmpty()
                 ? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext)
                 : new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);
diff --git a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
index 4d3a5a1a8ae..e23fb6028e8 100644
--- a/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
+++ b/infra/rewrite/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
@@ -86,7 +86,7 @@ public final class SQLRewriteEntryTest {
         Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(2, 1);
         databaseTypes.put("ds_0", new H2DatabaseType());
         databaseTypes.put("ds_1", new MySQLDatabaseType());
-        when(result.getDatabaseTypes()).thenReturn(databaseTypes);
+        when(result.getStorageTypes()).thenReturn(databaseTypes);
         return result;
     }
 }
diff --git a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java
new file mode 100644
index 00000000000..5c80aa6a46a
--- /dev/null
+++ b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/reflect/ReflectiveUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.util.reflect;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+
+import java.lang.reflect.Field;
+import java.util.Objects;
+
+/**
+ * Reflective utility.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectiveUtil {
+    
+    /**
+     * Get field value object.
+     *
+     * @param object object
+     * @param fieldName field name
+     * @return object
+     */
+    public static Object getFieldValue(final Object object, final String fieldName) {
+        return getFieldValue(object, getField(object, fieldName));
+    }
+    
+    private static Object getFieldValue(final Object object, final Field field) {
+        if (null == object || null == field) {
+            return null;
+        }
+        field.setAccessible(true);
+        Object result = null;
+        try {
+            result = field.get(object);
+        } catch (IllegalAccessException ignored) {
+        }
+        return result;
+    }
+    
+    private static Field getField(final Object target, final String fieldName) {
+        Class<?> clazz = target.getClass();
+        while (null != clazz) {
+            try {
+                return clazz.getDeclaredField(fieldName);
+            } catch (final NoSuchFieldException ignored) {
+            }
+            clazz = clazz.getSuperclass();
+        }
+        return null;
+    }
+    
+    /**
+     * Set value to specified field.
+     * 
+     * @param target target
+     * @param fieldName field name
+     * @param value value
+     */
+    @SneakyThrows(ReflectiveOperationException.class)
+    public static void setProperty(final Object target, final String fieldName, final Object value) {
+        Field field = getField(target, fieldName);
+        Objects.requireNonNull(field);
+        field.setAccessible(true);
+        field.set(target, value);
+    }
+}
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 15ea089cd52..177bd3b881e 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -53,8 +53,8 @@ public final class DriverExecutor implements AutoCloseable {
         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.isHoldTransaction());
         regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), connection.getContextManager(), jdbcExecutor);
         rawExecutor = new RawExecutor(executorEngine, connection.isHoldTransaction(), metaDataContexts.getMetaData().getProps(), eventBusContext);
-        DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType();
-        String schemaName = DatabaseTypeEngine.getDefaultSchemaName(databaseType, connection.getDatabaseName());
+        DatabaseType protocolType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
+        String schemaName = DatabaseTypeEngine.getDefaultSchemaName(protocolType, connection.getDatabaseName());
         SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
         federationExecutor = sqlFederationRule.getSQLFederationExecutor(connection.getDatabaseName(), schemaName, metaDataContexts.getMetaData(),
                 metaDataContexts.getShardingSphereData(), jdbcExecutor, eventBusContext);
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 2d2b2a78206..19cb5608f37 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.executor.batch;
 
 import lombok.Getter;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -137,11 +138,11 @@ public final class BatchPreparedStatementExecutor {
      */
     public int[] executeBatch(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        JDBCExecutorCallback<int[]> callback = new JDBCExecutorCallback<int[]>(
-                metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getDatabaseType(), sqlStatementContext.getSqlStatement(), isExceptionThrown, eventBusContext) {
+        JDBCExecutorCallback<int[]> callback = new JDBCExecutorCallback<int[]>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown, eventBusContext) {
             
             @Override
-            protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return statement.executeBatch();
             }
             
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
index 8089c9bff2d..90634148388 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/ExecuteQueryCallback.java
@@ -18,17 +18,18 @@
 package org.apache.shardingsphere.driver.executor.callback;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -36,14 +37,15 @@ import java.util.Optional;
  */
 public abstract class ExecuteQueryCallback extends JDBCExecutorCallback<QueryResult> {
     
-    protected ExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
-        super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+    protected ExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+                                   final EventBusContext eventBusContext) {
+        super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
     }
     
     @Override
-    protected final QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+    protected final QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
         ResultSet resultSet = executeQuery(sql, statement);
-        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, getDatabaseType());
+        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, storageType);
     }
     
     @Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
index 4b364caace8..d0237f9b333 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/PreparedStatementExecuteQueryCallback.java
@@ -26,14 +26,16 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 
 /**
  * Prepared statement execute query callback.
  */
 public final class PreparedStatementExecuteQueryCallback extends ExecuteQueryCallback {
     
-    public PreparedStatementExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
-        super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+    public PreparedStatementExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+                                                 final EventBusContext eventBusContext) {
+        super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
     }
     
     @Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
index 126d618b54b..8e03916fc3a 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/callback/impl/StatementExecuteQueryCallback.java
@@ -25,14 +25,16 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 
 /**
  * Statement execute query callback.
  */
 public final class StatementExecuteQueryCallback extends ExecuteQueryCallback {
     
-    public StatementExecuteQueryCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown, final EventBusContext eventBusContext) {
-        super(databaseType, sqlStatement, isExceptionThrown, eventBusContext);
+    public StatementExecuteQueryCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown,
+                                         final EventBusContext eventBusContext) {
+        super(protocolType, storageTypes, sqlStatement, isExceptionThrown, eventBusContext);
     }
     
     @Override
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 091f4e7e313..9eb820c4095 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -44,6 +44,7 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -294,13 +295,14 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
         cacheStatements(executionGroupContext.getInputGroups());
         return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(),
-                new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
-                        sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext));
+                new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                        metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
+                        eventBusContext));
     }
     
     private ResultSet executeFederationQuery(final QueryContext queryContext) throws SQLException {
-        PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
+        PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(),
                 eventBusContext);
         SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
         return executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
@@ -310,7 +312,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager,
                 statementOption, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
     }
     
     @Override
@@ -357,11 +359,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     
     private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown,
+        return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown,
                 eventBusContext) {
             
             @Override
-            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return ((PreparedStatement) statement).executeUpdate();
             }
             
@@ -434,11 +437,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     
     private JDBCExecutorCallback<Boolean> createExecuteCallback() {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown,
+        return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown,
                 eventBusContext) {
             
             @Override
-            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return ((PreparedStatement) statement).execute();
             }
             
@@ -605,7 +609,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
                 .<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), connection.getConnectionManager(), statementManager, statementOption,
                 metaDataContexts.getMetaData()
                         .getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
         List<ExecutionUnit> executionUnits = new ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits().size());
         for (BatchExecutionUnit each : batchPreparedStatementExecutor.getBatchExecutionUnits()) {
             ExecutionUnit executionUnit = each.getExecutionUnit();
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 092c0509470..167dca50ab0 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -214,14 +215,16 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
         cacheStatements(executionGroupContext.getInputGroups());
-        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
-                executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
+        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), executionContext.getSqlStatementContext().getSqlStatement(),
+                SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
         return executor.getRegularExecutor().executeQuery(executionGroupContext, executionContext.getQueryContext(), callback);
     }
     
     private ResultSet executeFederationQuery(final QueryContext queryContext) throws SQLException {
-        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(),
-                queryContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
+        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), queryContext.getSqlStatementContext().getSqlStatement(),
+                SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext);
         SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
         return executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(), callback, context);
     }
@@ -230,7 +233,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         int maxConnectionsSizePerQuery = metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager, statementOption,
                 metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules(),
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes());
     }
     
     @Override
@@ -353,12 +356,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     private int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteUpdateCallback updater,
                               final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
+        JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
                 eventBusContext) {
             
             @Override
-            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return updater.executeUpdate(sql, statement);
             }
             
@@ -434,11 +437,11 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteCallback executeCallback,
                             final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getDatabaseType(), sqlStatement, isExceptionThrown, eventBusContext) {
+        JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
             
             @Override
-            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return executeCallback.execute(sql, statement);
             }
             
diff --git a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
index 8f8102a5d00..d56f359e515 100644
--- a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
+++ b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java
@@ -70,7 +70,7 @@ public final class TransactionRule implements GlobalRule, ResourceHeldRule<Shard
         for (Entry<String, ShardingSphereDatabase> entry : databases.entrySet()) {
             ShardingSphereDatabase database = entry.getValue();
             database.getResourceMetaData().getDataSources().forEach((key, value) -> dataSourceMap.put(database.getName() + "." + key, value));
-            database.getResourceMetaData().getDatabaseTypes().forEach((key, value) -> databaseTypes.put(database.getName() + "." + key, value));
+            database.getResourceMetaData().getStorageTypes().forEach((key, value) -> databaseTypes.put(database.getName() + "." + key, value));
         }
         if (dataSourceMap.isEmpty()) {
             return new ShardingSphereTransactionManagerEngine();
diff --git a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
index 11c5c171139..78f21235c29 100644
--- a/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
+++ b/kernel/transaction/core/src/test/java/org/apache/shardingsphere/transaction/rule/TransactionRuleTest.java
@@ -65,7 +65,7 @@ public final class TransactionRuleTest {
         Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(2, 1);
         databaseTypes.put("ds_0", new PostgreSQLDatabaseType());
         databaseTypes.put("ds_1", new OpenGaussDatabaseType());
-        when(result.getDatabaseTypes()).thenReturn(databaseTypes);
+        when(result.getStorageTypes()).thenReturn(databaseTypes);
         return result;
     }
     
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 5fda6af76aa..6fd285ca250 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -523,7 +523,7 @@ public final class ContextManager implements AutoCloseable {
     private ShardingSphereSchema loadSchema(final String databaseName, final String schemaName, final String dataSourceName) throws SQLException {
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
         database.reloadRules(MutableDataNodeRule.class);
-        GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(), database.getResourceMetaData().getDatabaseTypes(),
+        GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(), database.getResourceMetaData().getStorageTypes(),
                 Collections.singletonMap(dataSourceName, database.getResourceMetaData().getDataSources().get(dataSourceName)),
                 database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
         ShardingSphereSchema result = GenericSchemaBuilder.build(material).get(schemaName);
@@ -568,7 +568,7 @@ public final class ContextManager implements AutoCloseable {
     private synchronized void reloadTable(final String databaseName, final String schemaName, final String tableName, final Map<String, DataSource> dataSourceMap) throws SQLException {
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(databaseName);
         GenericSchemaBuilderMaterial material = new GenericSchemaBuilderMaterial(database.getProtocolType(),
-                database.getResourceMetaData().getDatabaseTypes(), dataSourceMap, database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
+                database.getResourceMetaData().getStorageTypes(), dataSourceMap, database.getRuleMetaData().getRules(), metaDataContexts.getMetaData().getProps(), schemaName);
         ShardingSphereSchema schema = GenericSchemaBuilder.build(Collections.singletonList(tableName), material).getOrDefault(schemaName, new ShardingSphereSchema());
         if (schema.containsTable(tableName)) {
             alterTable(databaseName, schemaName, schema.getTable(tableName));
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index ad22f74dcba..ee7963516e3 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -187,7 +187,7 @@ public final class ProxySQLExecutor {
         JDBCBackendStatement statementManager = (JDBCBackendStatement) backendConnection.getConnectionSession().getStatementManager();
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
                 type, maxConnectionsSizePerQuery, backendConnection, statementManager, new StatementOption(isReturnGeneratedKeys), rules,
-                ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 9d9a9f4b22f..b7af3a1268d 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -138,7 +138,7 @@ public final class ReactiveProxySQLExecutor {
         VertxBackendStatement statementManager = (VertxBackendStatement) backendConnection.getConnectionSession().getStatementManager();
         DriverExecutionPrepareEngine<VertxExecutionUnit, Future<? extends SqlClient>> prepareEngine = new DriverExecutionPrepareEngine<>(
                 TYPE, maxConnectionsSizePerQuery, backendConnection, statementManager, new VertxExecutionContext(), rules,
-                ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                ProxyContext.getInstance().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
         ExecutionGroupContext<VertxExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 9842334008b..8edb5996836 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -62,6 +62,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -155,8 +156,8 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         boolean isReturnGeneratedKeys = queryContext.getSqlStatementContext().getSqlStatement() instanceof MySQLInsertStatement;
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName());
         DatabaseType protocolType = database.getProtocolType();
-        DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
-        ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, databaseType,
+        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
+        ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(getDriverType(), protocolType, storageTypes,
                 queryContext.getSqlStatementContext().getSqlStatement(), this, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
         SQLFederationExecutorContext context = new SQLFederationExecutorContext(false, queryContext, metaDataContexts.getMetaData().getDatabases());
@@ -168,7 +169,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
         JDBCBackendStatement statementManager = (JDBCBackendStatement) backendConnection.getConnectionSession().getStatementManager();
         return new DriverExecutionPrepareEngine<>(getDriverType(), maxConnectionsSizePerQuery, backendConnection, statementManager,
                 new StatementOption(isReturnGeneratedKeys), metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getRuleMetaData().getRules(),
-                metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaData.getMetaData().getDatabase(backendConnection.getConnectionSession().getDatabaseName()).getResourceMetaData().getStorageTypes());
     }
     
     private ResponseHeader processExecuteFederation(final ResultSet resultSet, final MetaDataContexts metaDataContexts) throws SQLException {
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 03576f12ee8..989ccd542c4 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -36,6 +36,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Proxy JDBC executor.
@@ -68,13 +69,13 @@ public final class ProxyJDBCExecutor {
             EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
             ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
             DatabaseType protocolType = database.getProtocolType();
-            DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
+            Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
             ExecuteProcessEngine.initializeExecution(queryContext, executionGroupContext, eventBusContext);
             SQLStatementContext<?> context = queryContext.getSqlStatementContext();
             List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                             true),
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                             false));
             ExecuteProcessEngine.finishExecution(executionGroupContext.getExecutionID(), eventBusContext);
             return result;
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index e1692825862..1a34ae79c2b 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -36,6 +36,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -51,38 +52,38 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
     
     private boolean hasMetaData;
     
-    public ProxyJDBCExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+    public ProxyJDBCExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
                                      final JDBCDatabaseCommunicationEngine databaseCommunicationEngine,
                                      final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(protocolType, databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+        super(protocolType, storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
         this.databaseCommunicationEngine = databaseCommunicationEngine;
         this.isReturnGeneratedKeys = isReturnGeneratedKeys;
         this.fetchMetaData = fetchMetaData;
     }
     
     @Override
-    public ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+    public ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
         if (fetchMetaData && !hasMetaData) {
             hasMetaData = true;
-            return executeSQL(sql, statement, connectionMode, true);
+            return executeSQL(sql, statement, connectionMode, true, storageType);
         }
-        return executeSQL(sql, statement, connectionMode, false);
+        return executeSQL(sql, statement, connectionMode, false, storageType);
     }
     
-    private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetaData) throws SQLException {
+    private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetaData, final DatabaseType storageType) throws SQLException {
         databaseCommunicationEngine.add(statement);
         if (execute(sql, statement, isReturnGeneratedKeys)) {
             ResultSet resultSet = statement.getResultSet();
             databaseCommunicationEngine.add(resultSet);
-            return createQueryResult(resultSet, connectionMode);
+            return createQueryResult(resultSet, connectionMode, storageType);
         }
         return new UpdateResult(statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
     }
     
     protected abstract boolean execute(String sql, Statement statement, boolean isReturnGeneratedKeys) throws SQLException;
     
-    private QueryResult createQueryResult(final ResultSet resultSet, final ConnectionMode connectionMode) throws SQLException {
-        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, getDatabaseType());
+    private QueryResult createQueryResult(final ResultSet resultSet, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new JDBCStreamQueryResult(resultSet) : new JDBCMemoryQueryResult(resultSet, storageType);
     }
     
     private long getGeneratedKey(final Statement statement) throws SQLException {
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 75ec1cc1eb4..493b26c99e2 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -27,6 +27,8 @@ import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callb
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
+import java.util.Map;
+
 /**
  * Proxy JDBC executor callback factory.
  */
@@ -38,7 +40,7 @@ public final class ProxyJDBCExecutorCallbackFactory {
      *
      * @param type driver type
      * @param protocolType protocol type
-     * @param databaseType database type
+     * @param storageTypes storage types
      * @param sqlStatement SQL statement
      * @param databaseCommunicationEngine database communication engine
      * @param isReturnGeneratedKeys is return generated keys or not
@@ -46,14 +48,14 @@ public final class ProxyJDBCExecutorCallbackFactory {
      * @param isFetchMetaData is fetch meta data or not
      * @return created instance
      */
-    public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+    public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
                                                         final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown,
                                                         final boolean isFetchMetaData) {
         if (JDBCDriverType.STATEMENT.equals(type)) {
-            return new ProxyStatementExecutorCallback(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+            return new ProxyStatementExecutorCallback(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
         }
         if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
-            return new ProxyPreparedStatementExecutorCallback(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+            return new ProxyPreparedStatementExecutorCallback(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
         }
         throw new UnsupportedSQLOperationException(String.format("Unsupported driver type: `%s`", type));
     }
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 9867aca81c4..3d028a060d2 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -25,16 +25,17 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 
 /**
  * Prepared statement executor callback for proxy.
  */
 public final class ProxyPreparedStatementExecutorCallback extends ProxyJDBCExecutorCallback {
     
-    public ProxyPreparedStatementExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+    public ProxyPreparedStatementExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
                                                   final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys,
                                                   final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index dac1fd231ae..21e3209eaa0 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -24,16 +24,17 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Map;
 
 /**
  * Statement executor callback for proxy.
  */
 public final class ProxyStatementExecutorCallback extends ProxyJDBCExecutorCallback {
     
-    public ProxyStatementExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement,
+    public ProxyStatementExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
                                           final JDBCDatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys,
                                           final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(protocolType, databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(protocolType, storageTypes, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
index a1a60bcc77d..05e2db78d4b 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
@@ -71,6 +71,7 @@ import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -139,21 +140,23 @@ public final class PreviewHandler extends SQLRULBackendHandler<PreviewStatement>
         boolean isReturnGeneratedKeys = sqlStatement instanceof MySQLInsertStatement;
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaDataContexts);
         SQLFederationExecutorContext context = new SQLFederationExecutorContext(true, queryContext, metaDataContexts.getMetaData().getDatabases());
-        DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getDatabaseType();
-        String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
+        ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(getDatabaseName());
+        String schemaName = queryContext.getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(database.getProtocolType(), databaseName));
         EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
         SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
         SQLFederationExecutor sqlFederationExecutor = sqlFederationRule.getSQLFederationExecutor(databaseName, schemaName,
                 metaDataContexts.getMetaData(), metaDataContexts.getShardingSphereData(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), false), eventBusContext);
-        sqlFederationExecutor.executeQuery(prepareEngine, createPreviewFederationCallback(sqlStatement, databaseType, eventBusContext), context);
+        sqlFederationExecutor.executeQuery(prepareEngine, createPreviewFederationCallback(database.getProtocolType(), database.getResourceMetaData().getStorageTypes(), sqlStatement, eventBusContext),
+                context);
         return context.getExecutionUnits();
     }
     
-    private JDBCExecutorCallback<ExecuteResult> createPreviewFederationCallback(final SQLStatement sqlStatement, final DatabaseType databaseType, final EventBusContext eventBusContext) {
-        return new JDBCExecutorCallback<ExecuteResult>(databaseType, sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext) {
+    private JDBCExecutorCallback<ExecuteResult> createPreviewFederationCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement,
+                                                                                final EventBusContext eventBusContext) {
+        return new JDBCExecutorCallback<ExecuteResult>(protocolType, storageTypes, sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown(), eventBusContext) {
             
             @Override
-            protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+            protected ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
                 return new JDBCStreamQueryResult(statement.executeQuery(sql));
             }
             
@@ -169,7 +172,7 @@ public final class PreviewHandler extends SQLRULBackendHandler<PreviewStatement>
         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, (JDBCBackendConnection) getConnectionSession().getBackendConnection(),
                 (JDBCBackendStatement) getConnectionSession().getStatementManager(), new StatementOption(isReturnGeneratedKeys),
                 metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getRuleMetaData().getRules(),
-                metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaDataContexts.getMetaData().getDatabase(getDatabaseName()).getResourceMetaData().getStorageTypes());
     }
     
     private String getDatabaseName() {
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index f2e83d9e3d6..d70929f4d08 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -140,7 +140,7 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, metaDataContexts.getMetaData().getProps()
                 .<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), (JDBCBackendConnection) connectionSession.getBackendConnection(),
                 (JDBCBackendStatement) connectionSession.getStatementManager(), new StatementOption(false), rules,
-                metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), samplingExecutionUnit());
         for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
             for (JDBCExecutionUnit each : eachGroup.getInputs()) {
@@ -167,8 +167,8 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
     
     private UpdateResponseHeader executeBatchedStatements(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseType();
-        JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(databaseType, sqlStatementSample, isExceptionThrown);
+        Map<String, DatabaseType> storageTypes = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes();
+        JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(storageTypes, sqlStatementSample, isExceptionThrown);
         List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback);
         int updated = 0;
         for (int[] eachResult : executeResults) {
@@ -182,12 +182,12 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
     
     private static class BatchedJDBCExecutorCallback extends JDBCExecutorCallback<int[]> {
         
-        BatchedJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
-            super(DatabaseTypeFactory.getInstance("MySQL"), databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+        BatchedJDBCExecutorCallback(final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
+            super(DatabaseTypeFactory.getInstance("MySQL"), storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
         }
         
         @Override
-        protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+        protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
             try {
                 return statement.executeBatch();
             } finally {
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
index 3102d341f27..0f82a29ffd2 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
@@ -110,7 +110,7 @@ public final class MySQLComQueryPacketExecutorTest {
         try (MockedStatic<ProxyContext> mockedStatic = mockStatic(ProxyContext.class)) {
             ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
             mockedStatic.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
-            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getResourceMetaData().getDatabaseTypes())
+            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getResourceMetaData().getStorageTypes())
                     .thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
             when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db_name").getProtocolType()).thenReturn(new MySQLDatabaseType());
             ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
diff --git a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
index 460842ff5ec..afc600bce4a 100644
--- a/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
+++ b/proxy/frontend/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandlerTest.java
@@ -75,8 +75,8 @@ public final class MySQLMultiStatementsHandlerTest {
     
     @Test
     public void assertExecute() throws SQLException {
-        final String sql = "update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3";
-        when(connectionSession.getDatabaseName()).thenReturn("");
+        String sql = "update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3";
+        when(connectionSession.getDatabaseName()).thenReturn("db");
         when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         when(connectionSession.getStatementManager()).thenReturn(backendStatement);
         Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
@@ -89,9 +89,11 @@ public final class MySQLMultiStatementsHandlerTest {
         MySQLUpdateStatement expectedStatement = mock(MySQLUpdateStatement.class);
         try (MockedStatic<ProxyContext> mockedStatic = mockStatic(ProxyContext.class)) {
             mockedStatic.when(ProxyContext::getInstance).thenReturn(proxyContext);
-            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("").getResourceMetaData().getDatabaseTypes())
+            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getAllInstanceDataSourceNames())
+                    .thenReturn(Collections.singletonList("ds_0"));
+            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getStorageTypes())
                     .thenReturn(Collections.singletonMap("ds_0", new MySQLDatabaseType()));
-            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("").getProtocolType()).thenReturn(new MySQLDatabaseType());
+            when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db").getProtocolType()).thenReturn(new MySQLDatabaseType());
             ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
             when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
             when(globalRuleMetaData.getSingleRule(SQLParserRule.class)).thenReturn(new SQLParserRule(new DefaultSQLParserRuleConfigurationBuilder().build()));
diff --git a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index b9cd7637639..ccb72e2f337 100644
--- a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++ b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
 import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -79,7 +80,8 @@ public final class OpenGaussComBatchBindExecutorTest extends ProxyContextRestore
         ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
         when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
-        when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+        when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new OpenGaussDatabaseType()));
         when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
         when(globalRuleMetaData.getSingleRule(SQLTranslatorRule.class)).thenReturn(new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()));
         String statement = "S_1";
@@ -91,7 +93,7 @@ public final class OpenGaussComBatchBindExecutorTest extends ProxyContextRestore
         when(connectionSession.getConnectionId()).thenReturn(1);
         JDBCBackendConnection backendConnection = mock(JDBCBackendConnection.class);
         Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
-        when(connection.getMetaData().getURL()).thenReturn("");
+        when(connection.getMetaData().getURL()).thenReturn("jdbc:opengauss://127.0.0.1/db");
         when(backendConnection.getConnections(nullable(String.class), anyInt(), any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
diff --git a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 1b0872c6da6..7cc9ab212a8 100644
--- a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++ b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -148,7 +148,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
                 metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
                 (JDBCBackendConnection) connectionSession.getBackendConnection(), (JDBCBackendStatement) connectionSession.getStatementManager(),
-                new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getDatabaseTypes());
+                new StatementOption(false), rules, metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()).getResourceMetaData().getStorageTypes());
         executionGroupContext = prepareEngine.prepare(anyExecutionContext.getRouteContext(), executionUnitParameters.keySet());
         for (ExecutionGroup<JDBCExecutionUnit> eachGroup : executionGroupContext.getInputGroups()) {
             for (JDBCExecutionUnit each : eachGroup.getInputs()) {
@@ -176,9 +176,9 @@ public final class PostgreSQLBatchedStatementsExecutor {
     private int executeBatchedPreparedStatements() throws SQLException {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
         ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
-        DatabaseType databaseType = database.getResourceMetaData().getDatabaseType();
+        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
         DatabaseType protocolType = database.getProtocolType();
-        JDBCExecutorCallback<int[]> callback = new BatchedStatementsJDBCExecutorCallback(protocolType, databaseType, preparedStatement.getSqlStatement(), isExceptionThrown);
+        JDBCExecutorCallback<int[]> callback = new BatchedStatementsJDBCExecutorCallback(protocolType, storageTypes, preparedStatement.getSqlStatement(), isExceptionThrown);
         List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback);
         int result = 0;
         for (int[] eachResult : executeResults) {
@@ -191,12 +191,12 @@ public final class PostgreSQLBatchedStatementsExecutor {
     
     private static class BatchedStatementsJDBCExecutorCallback extends JDBCExecutorCallback<int[]> {
         
-        BatchedStatementsJDBCExecutorCallback(final DatabaseType protocolType, final DatabaseType databaseType, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
-            super(protocolType, databaseType, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
+        BatchedStatementsJDBCExecutorCallback(final DatabaseType protocolType, final Map<String, DatabaseType> storageTypes, final SQLStatement sqlStatement, final boolean isExceptionThrown) {
+            super(protocolType, storageTypes, sqlStatement, isExceptionThrown, ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
         }
         
         @Override
-        protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
+        protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
             try {
                 return statement.executeBatch();
             } finally {
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
index 3b750721f8b..cb689b35ccd 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
 import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -93,14 +94,15 @@ public final class PostgreSQLAggregatedBatchedStatementsCommandExecutorTest exte
         when(connectionSession.getDatabaseName()).thenReturn("db");
         when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new ServerPreparedStatementRegistry());
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
-        when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+        when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
         when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
         connectionSession.getServerPreparedStatementRegistry().addPreparedStatement(STATEMENT_ID,
                 new PostgreSQLServerPreparedStatement(SQL, SQL_PARSER_ENGINE.parse(SQL, false), null, Collections.singletonList(PostgreSQLColumnType.POSTGRESQL_TYPE_INT4)));
         when(connectionSession.getConnectionId()).thenReturn(CONNECTION_ID);
         JDBCBackendConnection backendConnection = mock(JDBCBackendConnection.class);
         Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
-        when(connection.getMetaData().getURL()).thenReturn("");
+        when(connection.getMetaData().getURL()).thenReturn("jdbc:postgresql://127.0.0.1/db");
         when(backendConnection.getConnections(nullable(String.class), anyInt(), any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
index 0cf134c4167..b2cfe79f7da 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLTypeUnspecifiedSQLParameter;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -89,7 +90,8 @@ public final class PostgreSQLBatchedStatementsExecutorTest extends ProxyContextR
         when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1);
         when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.SQL_SHOW)).thenReturn(false);
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
-        when(database.getResourceMetaData().getDatabaseTypes()).thenReturn(Collections.singletonMap("ds_0", mock(DatabaseType.class)));
+        when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
+        when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
         when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
         ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
         when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
index 7993dca5583..20d372efc30 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
@@ -128,7 +128,7 @@ public final class PostgreSQLComDescribeExecutorTest extends ProxyContextRestore
         ShardingSphereTable table = new ShardingSphereTable(TABLE_NAME, columnMetaData, Collections.emptyList(), Collections.emptyList());
         when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getSchema("public").getTable(TABLE_NAME)).thenReturn(table);
         when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getProtocolType()).thenReturn(new PostgreSQLDatabaseType());
-        when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getResourceMetaData().getDatabaseTypes())
+        when(contextManager.getMetaDataContexts().getMetaData().getDatabase(DATABASE_NAME).getResourceMetaData().getStorageTypes())
                 .thenReturn(Collections.singletonMap("ds_0", new PostgreSQLDatabaseType()));
         when(contextManager.getMetaDataContexts().getMetaData().containsDatabase(DATABASE_NAME)).thenReturn(true);
     }
diff --git a/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java b/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
index f8465d73e1c..eb7ff52a83e 100644
--- a/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
+++ b/test/rewrite/src/test/java/org/apache/shardingsphere/sharding/rewrite/parameterized/engine/AbstractSQLRewriterParameterizedTest.java
@@ -109,8 +109,8 @@ public abstract class AbstractSQLRewriterParameterizedTest {
         mockDataSource(databaseConfig.getDataSources());
         ShardingSphereResourceMetaData resourceMetaData = mock(ShardingSphereResourceMetaData.class);
         DatabaseType databaseType = DatabaseTypeFactory.getInstance(getTestParameters().getDatabaseType());
-        Map<String, DatabaseType> databaseTypes = createDatabaseTypes(databaseConfig, databaseType);
-        when(resourceMetaData.getDatabaseTypes()).thenReturn(databaseTypes);
+        Map<String, DatabaseType> storageTypes = createStorageTypes(databaseConfig, databaseType);
+        when(resourceMetaData.getStorageTypes()).thenReturn(storageTypes);
         String schemaName = DatabaseTypeEngine.getDefaultSchemaName(databaseType, DefaultDatabase.LOGIC_NAME);
         Map<String, ShardingSphereSchema> schemas = mockSchemas(schemaName);
         Collection<ShardingSphereRule> databaseRules = DatabaseRulesBuilder.build(DefaultDatabase.LOGIC_NAME, databaseConfig, mock(InstanceContext.class));
@@ -141,7 +141,7 @@ public abstract class AbstractSQLRewriterParameterizedTest {
                 : (((RouteSQLRewriteResult) sqlRewriteResult).getSqlRewriteUnits()).values();
     }
     
-    private static Map<String, DatabaseType> createDatabaseTypes(final DatabaseConfiguration databaseConfig, final DatabaseType databaseType) {
+    private static Map<String, DatabaseType> createStorageTypes(final DatabaseConfiguration databaseConfig, final DatabaseType databaseType) {
         Map<String, DatabaseType> result = new LinkedHashMap<>(databaseConfig.getDataSources().size(), 1);
         for (Entry<String, DataSource> entry : databaseConfig.getDataSources().entrySet()) {
             result.put(entry.getKey(), databaseType);