You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/19 14:12:51 UTC

[shardingsphere] branch master updated: Add ResponsePacketBuilder (#7519)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 019f7b1  Add ResponsePacketBuilder (#7519)
019f7b1 is described below

commit 019f7b1696de8be435a6630c7a099759778f5c8b
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Sep 19 22:12:34 2020 +0800

    Add ResponsePacketBuilder (#7519)
---
 .../execute/MySQLComStmtExecuteExecutor.java       | 32 ++------
 .../text/query/MySQLComQueryPacketExecutor.java    | 52 ++-----------
 .../command/query/util/ResponsePacketBuilder.java  | 89 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 72 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 19777c7..77493cf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -20,14 +20,9 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.exec
 import lombok.Getter;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -39,14 +34,13 @@ import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
 import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.util.ResponsePacketBuilder;
 import org.apache.shardingsphere.rdl.parser.engine.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -76,30 +70,16 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
         return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
     }
     
-    private Collection<DatabasePacket<?>> processQuery(final QueryResponse backendResponse) {
+    private Collection<DatabasePacket<?>> processQuery(final QueryResponse queryResponse) {
         responseType = ResponseType.QUERY;
-        return createQueryPackets(backendResponse);
-    }
-    
-    private Collection<DatabasePacket<?>> createQueryPackets(final QueryResponse backendResponse) {
-        Collection<DatabasePacket<?>> result = new LinkedList<>();
-        List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
-        result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
-        for (QueryHeader each : queryHeader) {
-            result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, each.getSchema(), each.getTable(), each.getTable(),
-                    each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
-        }
-        result.add(new MySQLEofPacket(++currentSequenceId));
+        Collection<DatabasePacket<?>> result = ResponsePacketBuilder.buildQueryResponsePackets(queryResponse);
+        currentSequenceId = result.size() + 1;
         return result;
     }
     
-    private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse backendResponse) {
+    private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse updateResponse) {
         responseType = ResponseType.UPDATE;
-        return createUpdatePackets(backendResponse);
-    }
-    
-    private Collection<DatabasePacket<?>> createUpdatePackets(final UpdateResponse updateResponse) {
-        return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+        return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponse);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 89a1ad4..2fe215a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -18,18 +18,11 @@
 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
 
 import lombok.Getter;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnFieldDetailFlag;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypes;
-import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.CircuitBreakException;
@@ -40,12 +33,10 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.util.ResponsePacketBuilder;
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
 
 /**
  * COM_QUERY command packet executor for MySQL.
@@ -72,47 +63,16 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
         return backendResponse instanceof QueryResponse ? processQuery((QueryResponse) backendResponse) : processUpdate((UpdateResponse) backendResponse);
     }
     
-    private Collection<DatabasePacket<?>> processQuery(final QueryResponse backendResponse) {
+    private Collection<DatabasePacket<?>> processQuery(final QueryResponse queryResponse) {
         responseType = ResponseType.QUERY;
-        return createQueryPackets(backendResponse);
-    }
-    
-    private Collection<DatabasePacket<?>> createQueryPackets(final QueryResponse backendResponse) {
-        Collection<DatabasePacket<?>> result = new LinkedList<>();
-        List<QueryHeader> queryHeader = backendResponse.getQueryHeaders();
-        result.add(new MySQLFieldCountPacket(++currentSequenceId, queryHeader.size()));
-        for (QueryHeader each : queryHeader) {
-            result.add(new MySQLColumnDefinition41Packet(++currentSequenceId, getColumnFieldDetailFlag(each), each.getSchema(), each.getTable(), each.getTable(), 
-                    each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
-        }
-        result.add(new MySQLEofPacket(++currentSequenceId));
-        return result;
-    }
-    
-    private int getColumnFieldDetailFlag(final QueryHeader header) {
-        int result = 0;
-        if (header.isPrimaryKey()) {
-            result += MySQLColumnFieldDetailFlag.PRIMARY_KEY.getValue();
-        }
-        if (header.isNotNull()) {
-            result += MySQLColumnFieldDetailFlag.NOT_NULL.getValue();
-        }
-        if (!header.isSigned()) {
-            result += MySQLColumnFieldDetailFlag.UNSIGNED.getValue();
-        }
-        if (header.isAutoIncrement()) {
-            result += MySQLColumnFieldDetailFlag.AUTO_INCREMENT.getValue();
-        }
+        Collection<DatabasePacket<?>> result = ResponsePacketBuilder.buildQueryResponsePackets(queryResponse);
+        currentSequenceId = result.size() + 1;
         return result;
     }
     
-    private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse backendResponse) {
+    private Collection<DatabasePacket<?>> processUpdate(final UpdateResponse updateResponse) {
         responseType = ResponseType.UPDATE;
-        return createUpdatePackets(backendResponse);
-    }
-    
-    private Collection<DatabasePacket<?>> createUpdatePackets(final UpdateResponse updateResponse) {
-        return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+        return ResponsePacketBuilder.buildUpdateResponsePackets(updateResponse);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java
new file mode 100644
index 0000000..86b29da
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/util/ResponsePacketBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLColumnType;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnFieldDetailFlag;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
+import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Response packet builder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ResponsePacketBuilder {
+    
+    /**
+     * Build query response packets.
+     * 
+     * @param queryResponse query response
+     * @return query response packets
+     */
+    public static Collection<DatabasePacket<?>> buildQueryResponsePackets(final QueryResponse queryResponse) {
+        Collection<DatabasePacket<?>> result = new LinkedList<>();
+        int sequenceId = 0;
+        List<QueryHeader> queryHeader = queryResponse.getQueryHeaders();
+        result.add(new MySQLFieldCountPacket(++sequenceId, queryHeader.size()));
+        for (QueryHeader each : queryHeader) {
+            result.add(new MySQLColumnDefinition41Packet(++sequenceId, getColumnFieldDetailFlag(each), each.getSchema(), each.getTable(), each.getTable(),
+                    each.getColumnLabel(), each.getColumnName(), each.getColumnLength(), MySQLColumnType.valueOfJDBCType(each.getColumnType()), each.getDecimals()));
+        }
+        result.add(new MySQLEofPacket(++sequenceId));
+        return result;
+    }
+    
+    private static int getColumnFieldDetailFlag(final QueryHeader header) {
+        int result = 0;
+        if (header.isPrimaryKey()) {
+            result += MySQLColumnFieldDetailFlag.PRIMARY_KEY.getValue();
+        }
+        if (header.isNotNull()) {
+            result += MySQLColumnFieldDetailFlag.NOT_NULL.getValue();
+        }
+        if (!header.isSigned()) {
+            result += MySQLColumnFieldDetailFlag.UNSIGNED.getValue();
+        }
+        if (header.isAutoIncrement()) {
+            result += MySQLColumnFieldDetailFlag.AUTO_INCREMENT.getValue();
+        }
+        return result;
+    }
+    
+    /**
+     * Build update response packets.
+     * 
+     * @param updateResponse update response
+     * @return update response packets
+     */
+    public static Collection<DatabasePacket<?>> buildUpdateResponsePackets(final UpdateResponse updateResponse) {
+        return Collections.singletonList(new MySQLOKPacket(1, updateResponse.getUpdateCount(), updateResponse.getLastInsertId()));
+    }
+}