You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/19 14:12:51 UTC
[shardingsphere] branch master updated: Add ResponsePacketBuilder
(#7519)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 019f7b1 Add ResponsePacketBuilder (#7519)
019f7b1 is described below
commit 019f7b1696de8be435a6630c7a099759778f5c8b
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Sep 19 22:12:34 2020 +0800
Add ResponsePacketBuilder (#7519)
---
.../execute/MySQLComStmtExecuteExecutor.java | 32 ++------
.../text/query/MySQLComQueryPacketExecutor.java | 52 ++-----------
.../command/query/util/ResponsePacketBuilder.java | 89 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 72 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 19777c7..77493cf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -20,14 +20,9 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.exec
import lombok.Getter;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -39,14 +34,13 @@ import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.util.ResponsePacketBuilder;
import org.apache.shardingsphere.rdl.parser.engine.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
/**
@@ -76,30 +70,16 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
- private Collection<DatabasePacket<?>> processQuery(final QueryResponse backendResponse) {
+ private Collection<DatabasePacket<?>> processQuery(final QueryResponse queryResponse) {
responseType = ResponseType.QUERY;
- return createQueryPackets(backendResponse);
- }
-
- private Collection<DatabasePacket<?>> createQueryPackets(final QueryResponse backendResponse) {
- Collection<DatabasePacket<?>> result = new LinkedList<>();
- List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
- result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
- for (QueryHeader each : queryHeader) {
- result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, each.getSchema(), each.getTable(), each.getTable(),
- each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
- }
- result.add(new MySQLEofPacket(++currentSequenceId));
+ Collection<DatabasePacket<?>> result = ResponsePacketBuilder.buildQueryResponsePackets(queryResponse);
+ currentSequenceId = result.size() + 1;
return result;
}
- private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse backendResponse) {
+ private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse updateResponse) {
responseType = ResponseType.UPDATE;
- return createUpdatePackets(backendResponse);
- }
-
- private Collection<DatabasePacket<?>> createUpdatePackets(final UpdateResponse updateResponse) {
- return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+ return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponse);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 89a1ad4..2fe215a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -18,18 +18,11 @@
package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
import lombok.Getter;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnFieldDetailFlag;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
-import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
@@ -40,12 +33,10 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.util.ResponsePacketBuilder;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
/**
* COM_QUERY command packet executor for MySQL.
@@ -72,47 +63,16 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
}
- private Collection<DatabasePacket<?>> processQuery(final QueryResponse backendResponse) {
+ private Collection<DatabasePacket<?>> processQuery(final QueryResponse queryResponse) {
responseType = ResponseType.QUERY;
- return createQueryPackets(backendResponse);
- }
-
- private Collection<DatabasePacket<?>> createQueryPackets(final QueryResponse backendResponse) {
- Collection<DatabasePacket<?>> result = new LinkedList<>();
- List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
- result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
- for (QueryHeader each : queryHeader) {
- result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, getColumnFieldDetailFlag(each), each.getSchema(), each.getTable(), each.getTable(),
- each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
- }
- result.add(new MySQLEofPacket(++currentSequenceId));
- return result;
- }
-
- private int getColumnFieldDetailFlag(final QueryHeader header) {
- int result = 0;
- if (header.isPrimaryKey()) {
- result += MySQLColumnFieldDetailFlag.PRIMARY_KEY.getValue();
- }
- if (header.isNotNull()) {
- result += MySQLColumnFieldDetailFlag.NOT_NULL.getValue();
- }
- if (!header.isSigned()) {
- result += MySQLColumnFieldDetailFlag.UNSIGNED.getValue();
- }
- if (header.isAutoIncrement()) {
- result += MySQLColumnFieldDetailFlag.AUTO_INCREMENT.getValue();
- }
+ Collection<DatabasePacket<?>> result = ResponsePacketBuilder.buildQueryResponsePackets(queryResponse);
+ currentSequenceId = result.size() + 1;
return result;
}
- private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse backendResponse) {
+ private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse updateResponse) {
responseType = ResponseType.UPDATE;
- return createUpdatePackets(backendResponse);
- }
-
- private Collection<DatabasePacket<?>> createUpdatePackets(final UpdateResponse updateResponse) {
- return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+ return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponse);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java
new file mode 100644
index 0000000..86b29da
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnFieldDetailFlag;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
+import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Response packet builder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ResponsePacketBuilder {
+
+ /**
+ * Build query response packets.
+ *
+ * @param queryResponse query response
+ * @return query response packets
+ */
+ public static Collection<DatabasePacket<?>> buildQueryResponsePackets(final QueryResponse queryResponse) {
+ Collection<DatabasePacket<?>> result = new LinkedList<>();
+ int sequenceId = 0;
+ List<QueryHeader> queryHeader = queryResponse.getQueryHeaders();
+ result.add(new MySQLFieldCountPacket(++sequenceId, queryHeader.size()));
+ for (QueryHeader each : queryHeader) {
+ result.add(new MySQLColumnDefinition41Packet(++sequenceId, getColumnFieldDetailFlag(each), each.getSchema(), each.getTable(), each.getTable(),
+ each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
+ }
+ result.add(new MySQLEofPacket(++sequenceId));
+ return result;
+ }
+
+ private static int getColumnFieldDetailFlag(final QueryHeader header) {
+ int result = 0;
+ if (header.isPrimaryKey()) {
+ result += MySQLColumnFieldDetailFlag.PRIMARY_KEY.getValue();
+ }
+ if (header.isNotNull()) {
+ result += MySQLColumnFieldDetailFlag.NOT_NULL.getValue();
+ }
+ if (!header.isSigned()) {
+ result += MySQLColumnFieldDetailFlag.UNSIGNED.getValue();
+ }
+ if (header.isAutoIncrement()) {
+ result += MySQLColumnFieldDetailFlag.AUTO_INCREMENT.getValue();
+ }
+ return result;
+ }
+
+ /**
+ * Build update response packets.
+ *
+ * @param updateResponse update response
+ * @return update response packets
+ */
+ public static Collection<DatabasePacket<?>> buildUpdateResponsePackets(final UpdateResponse updateResponse) {
+ return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+ }
+}