You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2020/12/24 11:36:18 UTC
[shardingsphere] branch master updated: Refactor try ddl lock for
proxy (#8758)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 312acff Refactor try ddl lock for proxy (#8758)
312acff is described below
commit 312acff995d448ccbd9b0eaef77a58883b175e5b
Author: Haoran Meng <me...@gmail.com>
AuthorDate: Thu Dec 24 19:35:48 2020 +0800
Refactor try ddl lock for proxy (#8758)
---
.../communication/DatabaseCommunicationEngine.java | 42 +++++++++++++---------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index a8ea366..2589cc1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -109,22 +109,33 @@ public final class DatabaseCommunicationEngine {
if (executionContext.getExecutionUnits().isEmpty()) {
return new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
}
- lockForDDL(executionContext, ProxyContext.getInstance().getMetaDataContexts().getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
- proxySQLExecutor.checkExecutePrerequisites(executionContext);
- Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
+ boolean locked = false;
+ Collection<ExecuteResult> executeResults;
+ try {
+ locked = tryLock(executionContext, ProxyContext.getInstance().getMetaDataContexts().getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS));
+ proxySQLExecutor.checkExecutePrerequisites(executionContext);
+ executeResults = proxySQLExecutor.execute(executionContext);
+ refreshSchema(executionContext);
+ } finally {
+ if (locked) {
+ releaseLock();
+ }
+ }
ExecuteResult executeResultSample = executeResults.iterator().next();
return executeResultSample instanceof QueryResult
? processExecuteQuery(executionContext, executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()), (QueryResult) executeResultSample)
: processExecuteUpdate(executionContext, executeResults.stream().map(each -> (UpdateResult) each).collect(Collectors.toList()));
}
- private void lockForDDL(final ExecutionContext executionContext, final Long lockTimeoutMilliseconds) {
+ private boolean tryLock(final ExecutionContext executionContext, final Long lockTimeoutMilliseconds) {
if (needLock(executionContext)) {
if (!LockContext.getLockStrategy().tryLock(lockTimeoutMilliseconds, TimeUnit.MILLISECONDS)) {
throw new LockWaitTimeoutException(lockTimeoutMilliseconds);
}
checkLock(lockTimeoutMilliseconds);
+ return true;
}
+ return false;
}
private boolean needLock(final ExecutionContext executionContext) {
@@ -137,6 +148,10 @@ public final class DatabaseCommunicationEngine {
}
}
+ private void releaseLock() {
+ LockContext.getLockStrategy().releaseLock();
+ }
+
private QueryResponseHeader processExecuteQuery(final ExecutionContext executionContext, final List<QueryResult> queryResults, final QueryResult queryResultSample) throws SQLException {
queryHeaders = createQueryHeaders(executionContext, queryResultSample);
mergedResult = mergeQuery(executionContext.getSqlStatementContext(), queryResults);
@@ -171,7 +186,6 @@ public final class DatabaseCommunicationEngine {
private UpdateResponseHeader processExecuteUpdate(final ExecutionContext executionContext, final Collection<UpdateResult> updateResults) throws SQLException {
UpdateResponseHeader result = new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement(), updateResults);
- refreshSchema(executionContext);
mergeUpdateCount(executionContext.getSqlStatementContext(), result);
return result;
}
@@ -181,17 +195,13 @@ public final class DatabaseCommunicationEngine {
SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
if (schemaRefresher.isPresent()) {
- try {
- Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream()
- .map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
- SchemaBuilderMaterials materials = new SchemaBuilderMaterials(
- ProxyContext.getInstance().getMetaDataContexts().getMetaData(metaData.getName()).getResource().getDatabaseType(),
- metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
- schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
- notifySchemaChanged(metaData.getName(), metaData.getSchema());
- } finally {
- LockContext.getLockStrategy().releaseLock();
- }
+ Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream()
+ .map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+ SchemaBuilderMaterials materials = new SchemaBuilderMaterials(
+ ProxyContext.getInstance().getMetaDataContexts().getMetaData(metaData.getName()).getResource().getDatabaseType(),
+ metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
+ schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
+ notifySchemaChanged(metaData.getName(), metaData.getSchema());
}
}