You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2020/09/09 05:59:52 UTC
[shardingsphere] branch master updated: extract an abstract class
for StatementExecutor&PreparedStatementExecutor
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1cdc030 extract an abstract class for StatementExecutor&PreparedStatementExecutor
new b7b96c0 Merge pull request #7343 from kimmking/dev1
1cdc030 is described below
commit 1cdc0302388298fd09ecb46b4c403eb61a3eae10
Author: kimmking <ki...@apache.org>
AuthorDate: Wed Sep 9 12:34:32 2020 +0800
extract an abstract class for StatementExecutor&PreparedStatementExecutor
---
.../driver/executor/AbstractStatementExecutor.java | 126 +++++++++++++++++++++
.../driver/executor/PreparedStatementExecutor.java | 103 +++--------------
.../driver/executor/StatementExecutor.java | 122 ++++----------------
3 files changed, 166 insertions(+), 185 deletions(-)
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
new file mode 100644
index 0000000..e0e93c3
--- /dev/null
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/AbstractStatementExecutor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.context.SchemaContext;
+import org.apache.shardingsphere.infra.context.SchemaContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
+import org.apache.shardingsphere.infra.executor.sql.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.StatementExecuteUnit;
+import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLExecutor;
+import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
+import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
+import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
+import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
+import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
+import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
+import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Abstract statement executor.
+ */
+@Getter
+@RequiredArgsConstructor
+public abstract class AbstractStatementExecutor {
+
+ static {
+ ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+ }
+
+ private final Map<String, DataSource> dataSourceMap;
+
+ private final SchemaContexts schemaContexts;
+
+ private final SQLExecutor sqlExecutor;
+
+ protected boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
+ return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
+ }
+
+ protected int accumulate(final List<Integer> results) {
+ int result = 0;
+ for (Integer each : results) {
+ result += null == each ? 0 : each;
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ if (null == sqlStatementContext) {
+ return;
+ }
+ Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
+ if (refreshStrategy.isPresent()) {
+ RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
+ refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(),
+ dataSourceMap, sqlStatementContext, tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(),
+ dataSourceMap, tableName, schemaContexts.getProps()));
+ notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
+ }
+ }
+
+ private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
+ RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
+ if (null != notifier) {
+ notifier.notify(schemaName, metaData);
+ }
+ }
+
+ /**
+ * Execute SQL.
+ *
+ * @param inputGroups input groups
+ * @param sqlStatementContext SQL statement context
+ * @return return true if is DQL, false if is DML
+ * @throws SQLException SQL exception
+ */
+ public abstract boolean execute(Collection<InputGroup<StatementExecuteUnit>> inputGroups, SQLStatementContext<?> sqlStatementContext) throws SQLException;
+
+ /**
+ * Execute query.
+ *
+ * @param inputGroups input groups
+ * @return result set list
+ * @throws SQLException SQL exception
+ */
+ public abstract List<QueryResult> executeQuery(Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException;
+
+ /**
+ * Execute update.
+ *
+ * @param inputGroups input groups
+ * @param sqlStatementContext SQL statement context
+ * @return effected records count
+ * @throws SQLException SQL exception
+ */
+ public abstract int executeUpdate(Collection<InputGroup<StatementExecuteUnit>> inputGroups, SQLStatementContext<?> sqlStatementContext) throws SQLException;
+}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
index aecacc9..81e433a 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/PreparedStatementExecutor.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.driver.executor;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.context.SchemaContext;
import org.apache.shardingsphere.infra.context.SchemaContexts;
-import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
@@ -31,15 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLE
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.impl.DefaultSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.MemoryQueryResult;
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.StreamQueryResult;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
-import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
import javax.sql.DataSource;
@@ -50,40 +39,26 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
* Prepared statement executor.
*/
-@RequiredArgsConstructor
-public final class PreparedStatementExecutor {
+public final class PreparedStatementExecutor extends AbstractStatementExecutor {
- static {
- ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+ public PreparedStatementExecutor(final Map<String, DataSource> dataSourceMap, final SchemaContexts schemaContexts, final SQLExecutor sqlExecutor) {
+ super(dataSourceMap, schemaContexts, sqlExecutor);
}
- private final Map<String, DataSource> dataSourceMap;
-
- private final SchemaContexts schemaContexts;
-
- private final SQLExecutor sqlExecutor;
-
- /**
- * Execute query.
- *
- * @param inputGroups input groups
- * @return result set list
- * @throws SQLException SQL exception
- */
+ @Override
public List<QueryResult> executeQuery(final Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecutorCallback<QueryResult> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithQueryResult(isExceptionThrown);
- return sqlExecutor.execute(inputGroups, sqlExecutorCallback);
+ return getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
}
private DefaultSQLExecutorCallback<QueryResult> createDefaultSQLExecutorCallbackWithQueryResult(final boolean isExceptionThrown) {
- return new DefaultSQLExecutorCallback<QueryResult>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ return new DefaultSQLExecutorCallback<QueryResult>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -98,26 +73,19 @@ public final class PreparedStatementExecutor {
};
}
- /**
- * Execute update.
- *
- * @param inputGroups input groups
- * @param sqlStatementContext SQL statement context
- * @return effected records count
- * @throws SQLException SQL exception
- */
+ @Override
public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecutorCallback<Integer> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithInteger(isExceptionThrown);
- List<Integer> results = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+ List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+ refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
return isNeedAccumulate(
- schemaContexts.getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)
+ getSchemaContexts().getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)
? accumulate(results) : results.get(0);
}
private DefaultSQLExecutorCallback<Integer> createDefaultSQLExecutorCallbackWithInteger(final boolean isExceptionThrown) {
- return new DefaultSQLExecutorCallback<Integer>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ return new DefaultSQLExecutorCallback<Integer>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -126,31 +94,12 @@ public final class PreparedStatementExecutor {
};
}
- private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext<?> sqlStatementContext) {
- return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
- }
-
- private int accumulate(final List<Integer> results) {
- int result = 0;
- for (Integer each : results) {
- result += null == each ? 0 : each;
- }
- return result;
- }
-
- /**
- * Execute SQL.
- *
- * @param inputGroups input groups
- * @param sqlStatementContext SQL statement context
- * @return return true if is DQL, false if is DML
- * @throws SQLException SQL exception
- */
+ @Override
public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecutorCallback<Boolean> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithBoolean(isExceptionThrown);
- List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+ List<Boolean> result = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+ refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
@@ -158,7 +107,7 @@ public final class PreparedStatementExecutor {
}
private DefaultSQLExecutorCallback<Boolean> createDefaultSQLExecutorCallbackWithBoolean(final boolean isExceptionThrown) {
- return new DefaultSQLExecutorCallback<Boolean>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ return new DefaultSQLExecutorCallback<Boolean>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -166,26 +115,4 @@ public final class PreparedStatementExecutor {
}
};
}
-
- @SuppressWarnings("unchecked")
- private void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
- if (null == sqlStatementContext) {
- return;
- }
- Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
- if (refreshStrategy.isPresent()) {
- RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
- refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(),
- dataSourceMap, sqlStatementContext, tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(),
- dataSourceMap, tableName, schemaContexts.getProps()));
- notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
- }
- }
-
- private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
- RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
- if (null != notifier) {
- notifier.notify(schemaName, metaData);
- }
- }
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
index 4e82e00..5315225 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/StatementExecutor.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.driver.executor;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.context.SchemaContext;
import org.apache.shardingsphere.infra.context.SchemaContexts;
-import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.executor.kernel.InputGroup;
import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
@@ -31,15 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLE
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.impl.DefaultSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.MemoryQueryResult;
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.queryresult.StreamQueryResult;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategy;
-import org.apache.shardingsphere.infra.metadata.refresh.MetaDataRefreshStrategyFactory;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.RuleSchemaMetaDataLoader;
-import org.apache.shardingsphere.infra.metadata.schema.spi.RuleMetaDataNotifier;
import org.apache.shardingsphere.infra.rule.DataNodeRoutedRule;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
import org.apache.shardingsphere.sql.parser.binder.statement.SQLStatementContext;
import javax.sql.DataSource;
@@ -49,35 +38,21 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
* Statement executor.
*/
-@RequiredArgsConstructor
-public final class StatementExecutor {
+public final class StatementExecutor extends AbstractStatementExecutor {
- static {
- ShardingSphereServiceLoader.register(RuleMetaDataNotifier.class);
+ public StatementExecutor(final Map<String, DataSource> dataSourceMap, final SchemaContexts schemaContexts, final SQLExecutor sqlExecutor) {
+ super(dataSourceMap, schemaContexts, sqlExecutor);
}
- private final Map<String, DataSource> dataSourceMap;
-
- private final SchemaContexts schemaContexts;
-
- private final SQLExecutor sqlExecutor;
-
- /**
- * Execute query.
- *
- * @param inputGroups input groups
- * @return result set list
- * @throws SQLException SQL exception
- */
+ @Override
public List<QueryResult> executeQuery(final Collection<InputGroup<StatementExecuteUnit>> inputGroups) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
- SQLExecutorCallback<QueryResult> sqlExecutorCallback = new DefaultSQLExecutorCallback<QueryResult>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ SQLExecutorCallback<QueryResult> sqlExecutorCallback = new DefaultSQLExecutorCallback<QueryResult>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
@@ -89,18 +64,11 @@ public final class StatementExecutor {
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
};
- return sqlExecutor.execute(inputGroups, sqlExecutorCallback);
+ return getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
}
- /**
- * Execute update.
- *
- * @param inputGroups input groups
- * @param sqlStatementContext SQL statement context
- * @return effected records count
- * @throws SQLException SQL exception
- */
- public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext) throws SQLException {
+ @Override
+ public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
return executeUpdate(inputGroups, Statement::executeUpdate, sqlStatementContext);
}
@@ -113,7 +81,7 @@ public final class StatementExecutor {
* @return effected records count
* @throws SQLException SQL exception
*/
- public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
+ public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, autoGeneratedKeys), sqlStatementContext);
}
@@ -126,7 +94,7 @@ public final class StatementExecutor {
* @return effected records count
* @throws SQLException SQL exception
*/
- public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int[] columnIndexes) throws SQLException {
+ public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int[] columnIndexes) throws SQLException {
return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, columnIndexes), sqlStatementContext);
}
@@ -139,50 +107,31 @@ public final class StatementExecutor {
* @return effected records count
* @throws SQLException SQL exception
*/
- public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final String[] columnNames) throws SQLException {
+ public int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final String[] columnNames) throws SQLException {
return executeUpdate(inputGroups, (statement, sql) -> statement.executeUpdate(sql, columnNames), sqlStatementContext);
}
@SuppressWarnings("unchecked")
- private int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Updater updater, final SQLStatementContext sqlStatementContext) throws SQLException {
+ private int executeUpdate(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Updater updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
- SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Integer>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Integer>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return updater.executeUpdate(statement, sql);
}
};
- List<Integer> results = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+ List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+ refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
if (isNeedAccumulate(
- schemaContexts.getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)) {
+ getSchemaContexts().getDefaultSchemaContext().getSchema().getRules().stream().filter(rule -> rule instanceof DataNodeRoutedRule).collect(Collectors.toList()), sqlStatementContext)) {
return accumulate(results);
}
return null == results.get(0) ? 0 : results.get(0);
}
- private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext sqlStatementContext) {
- return rules.stream().anyMatch(each -> ((DataNodeRoutedRule) each).isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames()));
- }
-
- private int accumulate(final List<Integer> results) {
- int result = 0;
- for (Integer each : results) {
- result += null == each ? 0 : each;
- }
- return result;
- }
-
- /**
- * Execute SQL.
- *
- * @param inputGroups input groups
- * @param sqlStatementContext SQL statement context
- * @return return true if is DQL, false if is DML
- * @throws SQLException SQL exception
- */
- public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext) throws SQLException {
+ @Override
+ public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
return execute(inputGroups, Statement::execute, sqlStatementContext);
}
@@ -195,7 +144,7 @@ public final class StatementExecutor {
* @return return true if is DQL, false if is DML
* @throws SQLException SQL exception
*/
- public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
+ public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int autoGeneratedKeys) throws SQLException {
return execute(inputGroups, (statement, sql) -> statement.execute(sql, autoGeneratedKeys), sqlStatementContext);
}
@@ -208,7 +157,7 @@ public final class StatementExecutor {
* @return return true if is DQL, false if is DML
* @throws SQLException SQL exception
*/
- public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final int[] columnIndexes) throws SQLException {
+ public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final int[] columnIndexes) throws SQLException {
return execute(inputGroups, (statement, sql) -> statement.execute(sql, columnIndexes), sqlStatementContext);
}
@@ -221,49 +170,28 @@ public final class StatementExecutor {
* @return return true if is DQL, false if is DML
* @throws SQLException SQL exception
*/
- public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext sqlStatementContext, final String[] columnNames) throws SQLException {
+ public boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatementContext<?> sqlStatementContext, final String[] columnNames) throws SQLException {
return execute(inputGroups, (statement, sql) -> statement.execute(sql, columnNames), sqlStatementContext);
}
@SuppressWarnings("unchecked")
- private boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Executor executor, final SQLStatementContext sqlStatementContext) throws SQLException {
+ private boolean execute(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final Executor executor, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
- SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Boolean>(schemaContexts.getDatabaseType(), isExceptionThrown) {
+ SQLExecutorCallback sqlExecutorCallback = new DefaultSQLExecutorCallback<Boolean>(getSchemaContexts().getDatabaseType(), isExceptionThrown) {
@Override
protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return executor.execute(statement, sql);
}
};
- List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
- refreshTableMetaData(schemaContexts.getDefaultSchemaContext(), sqlStatementContext);
+ List<Boolean> result = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
+ refreshTableMetaData(getSchemaContexts().getDefaultSchemaContext(), sqlStatementContext);
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
return result.get(0);
}
- @SuppressWarnings("unchecked")
- private void refreshTableMetaData(final SchemaContext schemaContext, final SQLStatementContext sqlStatementContext) throws SQLException {
- if (null == sqlStatementContext) {
- return;
- }
- Optional<MetaDataRefreshStrategy> refreshStrategy = MetaDataRefreshStrategyFactory.newInstance(sqlStatementContext);
- if (refreshStrategy.isPresent()) {
- RuleSchemaMetaDataLoader metaDataLoader = new RuleSchemaMetaDataLoader(schemaContext.getSchema().getRules());
- refreshStrategy.get().refreshMetaData(schemaContext.getSchema().getMetaData(), schemaContexts.getDatabaseType(), dataSourceMap, sqlStatementContext,
- tableName -> metaDataLoader.load(schemaContexts.getDatabaseType(), dataSourceMap, tableName, schemaContexts.getProps()));
- notifyPersistRuleMetaData(DefaultSchema.LOGIC_NAME, schemaContext.getSchema().getMetaData().getRuleSchemaMetaData());
- }
- }
-
- private void notifyPersistRuleMetaData(final String schemaName, final RuleSchemaMetaData metaData) {
- RuleMetaDataNotifier notifier = TypedSPIRegistry.getRegisteredService(RuleMetaDataNotifier.class);
- if (null != notifier) {
- notifier.notify(schemaName, metaData);
- }
- }
-
private interface Updater {
int executeUpdate(Statement statement, String sql) throws SQLException;