You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by GitBox <gi...@apache.org> on 2023/01/17 09:29:45 UTC

[GitHub] [brpc] yanglimingcn opened a new pull request, #2093: Add Mysql Protocol

yanglimingcn opened a new pull request, #2093:
URL: https://github.com/apache/brpc/pull/2093

   Add Mysql Protocol, support text protocol, transaction, support prepare statement.
   
   issue #209
   
   ### What problem does this PR solve?
   
   Issue Number:
   
   Problem Summary:
   
   ### What is changed and the side effects?
   
   Changed:
   
   Side effects:
   - Performance effects(性能影响):
   
   - Breaking backward compatibility(向后兼容性): 
   
   ---
   ### Check List:
   - Please make sure your changes are compilable(请确保你的更改可以通过编译).
   - When providing us with a new feature, it is best to add related tests(如果你向我们增加一个新的功能, 请添加相关测试).
   - Please follow [Contributor Covenant Code of Conduct](../../master/CODE_OF_CONDUCT.md).(请遵循贡献者准则).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [brpc] wwbmmm commented on a diff in pull request #2093: Add Mysql Protocol

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2093:
URL: https://github.com/apache/brpc/pull/2093#discussion_r1084936553


##########
src/brpc/mysql_common.h:
##########
@@ -0,0 +1,419 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_COMMON_H
+#define BRPC_MYSQL_COMMON_H
+
+#include <sstream>
+#include <map>
+#include "butil/logging.h"  // LOG()
+
+namespace brpc {
+// Msql Collation
+extern const char* MysqlDefaultCollation;
+extern const char* MysqlBinaryCollation;
+const std::map<std::string, uint8_t> MysqlCollations = {
+    {"big5_chinese_ci", 1},
+    {"latin2_czech_cs", 2},
+    {"dec8_swedish_ci", 3},
+    {"cp850_general_ci", 4},
+    {"latin1_german1_ci", 5},
+    {"hp8_english_ci", 6},
+    {"koi8r_general_ci", 7},
+    {"latin1_swedish_ci", 8},
+    {"latin2_general_ci", 9},
+    {"swe7_swedish_ci", 10},
+    {"ascii_general_ci", 11},
+    {"ujis_japanese_ci", 12},
+    {"sjis_japanese_ci", 13},
+    {"cp1251_bulgarian_ci", 14},
+    {"latin1_danish_ci", 15},
+    {"hebrew_general_ci", 16},
+    {"tis620_thai_ci", 18},
+    {"euckr_korean_ci", 19},
+    {"latin7_estonian_cs", 20},
+    {"latin2_hungarian_ci", 21},
+    {"koi8u_general_ci", 22},
+    {"cp1251_ukrainian_ci", 23},
+    {"gb2312_chinese_ci", 24},
+    {"greek_general_ci", 25},
+    {"cp1250_general_ci", 26},
+    {"latin2_croatian_ci", 27},
+    {"gbk_chinese_ci", 28},
+    {"cp1257_lithuanian_ci", 29},
+    {"latin5_turkish_ci", 30},
+    {"latin1_german2_ci", 31},
+    {"armscii8_general_ci", 32},
+    {"utf8_general_ci", 33},
+    {"cp1250_czech_cs", 34},
+    //{"ucs2_general_ci", 35},
+    {"cp866_general_ci", 36},
+    {"keybcs2_general_ci", 37},
+    {"macce_general_ci", 38},
+    {"macroman_general_ci", 39},
+    {"cp852_general_ci", 40},
+    {"latin7_general_ci", 41},
+    {"latin7_general_cs", 42},
+    {"macce_bin", 43},
+    {"cp1250_croatian_ci", 44},
+    {"utf8mb4_general_ci", 45},
+    {"utf8mb4_bin", 46},
+    {"latin1_bin", 47},
+    {"latin1_general_ci", 48},
+    {"latin1_general_cs", 49},
+    {"cp1251_bin", 50},
+    {"cp1251_general_ci", 51},
+    {"cp1251_general_cs", 52},
+    {"macroman_bin", 53},
+    //{"utf16_general_ci", 54},
+    //{"utf16_bin", 55},
+    //{"utf16le_general_ci", 56},
+    {"cp1256_general_ci", 57},
+    {"cp1257_bin", 58},
+    {"cp1257_general_ci", 59},
+    //{"utf32_general_ci", 60},
+    //{"utf32_bin", 61},
+    //{"utf16le_bin", 62},
+    {"binary", 63},
+    {"armscii8_bin", 64},
+    {"ascii_bin", 65},
+    {"cp1250_bin", 66},
+    {"cp1256_bin", 67},
+    {"cp866_bin", 68},
+    {"dec8_bin", 69},
+    {"greek_bin", 70},
+    {"hebrew_bin", 71},
+    {"hp8_bin", 72},
+    {"keybcs2_bin", 73},
+    {"koi8r_bin", 74},
+    {"koi8u_bin", 75},
+    {"utf8_tolower_ci", 76},
+    {"latin2_bin", 77},
+    {"latin5_bin", 78},
+    {"latin7_bin", 79},
+    {"cp850_bin", 80},
+    {"cp852_bin", 81},
+    {"swe7_bin", 82},
+    {"utf8_bin", 83},
+    {"big5_bin", 84},
+    {"euckr_bin", 85},
+    {"gb2312_bin", 86},
+    {"gbk_bin", 87},
+    {"sjis_bin", 88},
+    {"tis620_bin", 89},
+    //"{ucs2_bin", 90},
+    {"ujis_bin", 91},
+    {"geostd8_general_ci", 92},
+    {"geostd8_bin", 93},
+    {"latin1_spanish_ci", 94},
+    {"cp932_japanese_ci", 95},
+    {"cp932_bin", 96},
+    {"eucjpms_japanese_ci", 97},
+    {"eucjpms_bin", 98},
+    {"cp1250_polish_ci", 99},
+    // {"utf16_unicode_ci", 101},
+    // {"utf16_icelandic_ci", 102},
+    // {"utf16_latvian_ci", 103},
+    // {"utf16_romanian_ci", 104},
+    // {"utf16_slovenian_ci", 105},
+    // {"utf16_polish_ci", 106},
+    // {"utf16_estonian_ci", 107},
+    // {"utf16_spanish_ci", 108},
+    // {"utf16_swedish_ci", 109},
+    // {"utf16_turkish_ci", 110},
+    // {"utf16_czech_ci", 111},
+    // {"utf16_danish_ci", 112},
+    // {"utf16_lithuanian_ci", 113},
+    // {"utf16_slovak_ci", 114},
+    // {"utf16_spanish2_ci", 115},
+    // {"utf16_roman_ci", 116},
+    // {"utf16_persian_ci", 117},
+    // {"utf16_esperanto_ci", 118},
+    // {"utf16_hungarian_ci", 119},
+    // {"utf16_sinhala_ci", 120},
+    // {"utf16_german2_ci", 121},
+    // {"utf16_croatian_ci", 122},
+    // {"utf16_unicode_520_ci", 123},
+    // {"utf16_vietnamese_ci", 124},
+    // {"ucs2_unicode_ci", 128},
+    // {"ucs2_icelandic_ci", 129},
+    // {"ucs2_latvian_ci", 130},
+    // {"ucs2_romanian_ci", 131},
+    // {"ucs2_slovenian_ci", 132},
+    // {"ucs2_polish_ci", 133},
+    // {"ucs2_estonian_ci", 134},
+    // {"ucs2_spanish_ci", 135},
+    // {"ucs2_swedish_ci", 136},
+    // {"ucs2_turkish_ci", 137},
+    // {"ucs2_czech_ci", 138},
+    // {"ucs2_danish_ci", 139},
+    // {"ucs2_lithuanian_ci", 140},
+    // {"ucs2_slovak_ci", 141},
+    // {"ucs2_spanish2_ci", 142},
+    // {"ucs2_roman_ci", 143},
+    // {"ucs2_persian_ci", 144},
+    // {"ucs2_esperanto_ci", 145},
+    // {"ucs2_hungarian_ci", 146},
+    // {"ucs2_sinhala_ci", 147},
+    // {"ucs2_german2_ci", 148},
+    // {"ucs2_croatian_ci", 149},
+    // {"ucs2_unicode_520_ci", 150},
+    // {"ucs2_vietnamese_ci", 151},
+    // {"ucs2_general_mysql500_ci", 159},
+    // {"utf32_unicode_ci", 160},
+    // {"utf32_icelandic_ci", 161},
+    // {"utf32_latvian_ci", 162},
+    // {"utf32_romanian_ci", 163},
+    // {"utf32_slovenian_ci", 164},
+    // {"utf32_polish_ci", 165},
+    // {"utf32_estonian_ci", 166},
+    // {"utf32_spanish_ci", 167},
+    // {"utf32_swedish_ci", 168},
+    // {"utf32_turkish_ci", 169},
+    // {"utf32_czech_ci", 170},
+    // {"utf32_danish_ci", 171},
+    // {"utf32_lithuanian_ci", 172},
+    // {"utf32_slovak_ci", 173},
+    // {"utf32_spanish2_ci", 174},
+    // {"utf32_roman_ci", 175},
+    // {"utf32_persian_ci", 176},
+    // {"utf32_esperanto_ci", 177},
+    // {"utf32_hungarian_ci", 178},
+    // {"utf32_sinhala_ci", 179},
+    // {"utf32_german2_ci", 180},
+    // {"utf32_croatian_ci", 181},
+    // {"utf32_unicode_520_ci", 182},
+    // {"utf32_vietnamese_ci", 183},
+    {"utf8_unicode_ci", 192},
+    {"utf8_icelandic_ci", 193},
+    {"utf8_latvian_ci", 194},
+    {"utf8_romanian_ci", 195},
+    {"utf8_slovenian_ci", 196},
+    {"utf8_polish_ci", 197},
+    {"utf8_estonian_ci", 198},
+    {"utf8_spanish_ci", 199},
+    {"utf8_swedish_ci", 200},
+    {"utf8_turkish_ci", 201},
+    {"utf8_czech_ci", 202},
+    {"utf8_danish_ci", 203},
+    {"utf8_lithuanian_ci", 204},
+    {"utf8_slovak_ci", 205},
+    {"utf8_spanish2_ci", 206},
+    {"utf8_roman_ci", 207},
+    {"utf8_persian_ci", 208},
+    {"utf8_esperanto_ci", 209},
+    {"utf8_hungarian_ci", 210},
+    {"utf8_sinhala_ci", 211},
+    {"utf8_german2_ci", 212},
+    {"utf8_croatian_ci", 213},
+    {"utf8_unicode_520_ci", 214},
+    {"utf8_vietnamese_ci", 215},
+    {"utf8_general_mysql500_ci", 223},
+    {"utf8mb4_unicode_ci", 224},
+    {"utf8mb4_icelandic_ci", 225},
+    {"utf8mb4_latvian_ci", 226},
+    {"utf8mb4_romanian_ci", 227},
+    {"utf8mb4_slovenian_ci", 228},
+    {"utf8mb4_polish_ci", 229},
+    {"utf8mb4_estonian_ci", 230},
+    {"utf8mb4_spanish_ci", 231},
+    {"utf8mb4_swedish_ci", 232},
+    {"utf8mb4_turkish_ci", 233},
+    {"utf8mb4_czech_ci", 234},
+    {"utf8mb4_danish_ci", 235},
+    {"utf8mb4_lithuanian_ci", 236},
+    {"utf8mb4_slovak_ci", 237},
+    {"utf8mb4_spanish2_ci", 238},
+    {"utf8mb4_roman_ci", 239},
+    {"utf8mb4_persian_ci", 240},
+    {"utf8mb4_esperanto_ci", 241},
+    {"utf8mb4_hungarian_ci", 242},
+    {"utf8mb4_sinhala_ci", 243},
+    {"utf8mb4_german2_ci", 244},
+    {"utf8mb4_croatian_ci", 245},
+    {"utf8mb4_unicode_520_ci", 246},
+    {"utf8mb4_vietnamese_ci", 247},
+    {"gb18030_chinese_ci", 248},
+    {"gb18030_bin", 249},
+    {"gb18030_unicode_520_ci", 250},
+    {"utf8mb4_0900_ai_ci", 255},
+};
+
+enum MysqlFieldType : uint8_t {
+    MYSQL_FIELD_TYPE_DECIMAL = 0x00,
+    MYSQL_FIELD_TYPE_TINY = 0x01,
+    MYSQL_FIELD_TYPE_SHORT = 0x02,
+    MYSQL_FIELD_TYPE_LONG = 0x03,
+    MYSQL_FIELD_TYPE_FLOAT = 0x04,
+    MYSQL_FIELD_TYPE_DOUBLE = 0x05,
+    MYSQL_FIELD_TYPE_NULL = 0x06,
+    MYSQL_FIELD_TYPE_TIMESTAMP = 0x07,
+    MYSQL_FIELD_TYPE_LONGLONG = 0x08,
+    MYSQL_FIELD_TYPE_INT24 = 0x09,
+    MYSQL_FIELD_TYPE_DATE = 0x0A,
+    MYSQL_FIELD_TYPE_TIME = 0x0B,
+    MYSQL_FIELD_TYPE_DATETIME = 0x0C,
+    MYSQL_FIELD_TYPE_YEAR = 0x0D,
+    MYSQL_FIELD_TYPE_NEWDATE = 0x0E,
+    MYSQL_FIELD_TYPE_VARCHAR = 0x0F,
+    MYSQL_FIELD_TYPE_BIT = 0x10,
+    MYSQL_FIELD_TYPE_JSON = 0xF5,
+    MYSQL_FIELD_TYPE_NEWDECIMAL = 0xF6,
+    MYSQL_FIELD_TYPE_ENUM = 0xF7,
+    MYSQL_FIELD_TYPE_SET = 0xF8,
+    MYSQL_FIELD_TYPE_TINY_BLOB = 0xF9,
+    MYSQL_FIELD_TYPE_MEDIUM_BLOB = 0xFA,
+    MYSQL_FIELD_TYPE_LONG_BLOB = 0xFB,
+    MYSQL_FIELD_TYPE_BLOB = 0xFC,
+    MYSQL_FIELD_TYPE_VAR_STRING = 0xFD,
+    MYSQL_FIELD_TYPE_STRING = 0xFE,
+    MYSQL_FIELD_TYPE_GEOMETRY = 0xFF,
+};
+
+enum MysqlFieldFlag : uint16_t {
+    MYSQL_NOT_NULL_FLAG = 0x0001,
+    MYSQL_PRI_KEY_FLAG = 0x0002,
+    MYSQL_UNIQUE_KEY_FLAG = 0x0004,
+    MYSQL_MULTIPLE_KEY_FLAG = 0x0008,
+    MYSQL_BLOB_FLAG = 0x0010,
+    MYSQL_UNSIGNED_FLAG = 0x0020,
+    MYSQL_ZEROFILL_FLAG = 0x0040,
+    MYSQL_BINARY_FLAG = 0x0080,
+    MYSQL_ENUM_FLAG = 0x0100,
+    MYSQL_AUTO_INCREMENT_FLAG = 0x0200,
+    MYSQL_TIMESTAMP_FLAG = 0x0400,
+    MYSQL_SET_FLAG = 0x0800,
+};
+
+enum MysqlServerStatus : uint16_t {

Review Comment:
   这个哪里用了



##########
src/brpc/controller.h:
##########
@@ -104,6 +104,14 @@ enum StopStyle {
 
 const int32_t UNSET_MAGIC_NUM = -123456789;
 
+// if controller want to reserve a sock after RPC, set BIND_SOCK_ACTIVE

Review Comment:
   这个ACTIVE不是很好理解,是不是可以叫BIND_SOCK_RESERVE



##########
src/brpc/mysql.h:
##########
@@ -0,0 +1,286 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_H
+#define BRPC_MYSQL_H
+
+#include <string>
+#include <vector>
+#include <google/protobuf/stubs/common.h>
+
+#include <google/protobuf/generated_message_util.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include "google/protobuf/descriptor.pb.h"
+
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+#include "butil/arena.h"
+#include "parse_result.h"
+#include "mysql_command.h"
+#include "mysql_reply.h"
+#include "mysql_transaction.h"
+#include "mysql_statement.h"
+
+namespace brpc {
+// Request to mysql.
+// Notice that you can pipeline multiple commands in one request and sent
+// them to ONE mysql-server together.
+// Example:
+//   MysqlRequest request;
+//   request.Query("select * from table");
+//   MysqlResponse response;
+//   channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/);
+//   if (!cntl.Failed()) {
+//       LOG(INFO) << response.reply(0);
+//   }
+
+class MysqlStatementStub {

Review Comment:
   这个类是不是放在mysql_statment.h里更合适



##########
src/brpc/mysql_command.cpp:
##########
@@ -0,0 +1,260 @@
+// Copyright (c) 2015 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_common.h"
+#include "brpc/mysql.h"
+
+namespace brpc {
+
+namespace {
+const uint32_t max_allowed_packet = 67108864;

Review Comment:
   用全大写命名吧



##########
src/brpc/mysql_command.cpp:
##########
@@ -0,0 +1,260 @@
+// Copyright (c) 2015 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_common.h"
+#include "brpc/mysql.h"
+
+namespace brpc {
+
+namespace {
+const uint32_t max_allowed_packet = 67108864;
+const uint32_t max_packet_size = 16777215;
+
+template <class H, class F, class D>
+butil::Status MakePacket(butil::IOBuf* outbuf, const H& head, const F& func, const D& data) {
+    long pkg_len = head.size() + data.size();
+    if (pkg_len > max_allowed_packet) {
+        return butil::Status(
+            EINVAL,
+            "[MakePacket] statement size is too big, maxAllowedPacket = %d, pkg_len = %ld",
+            max_allowed_packet,
+            pkg_len);
+    }
+    uint32_t size, header;
+    uint8_t seq = 0;
+    size_t offset = 0;
+    for (; pkg_len > 0; pkg_len -= max_packet_size, ++seq) {
+        if (pkg_len > max_packet_size) {
+            size = max_packet_size;
+        } else {
+            size = pkg_len;
+        }
+        header = butil::ByteSwapToLE32(size);
+        ((uint8_t*)&header)[3] = seq;
+        outbuf->append(&header, 4);
+        if (seq == 0) {
+            const uint32_t old_size = outbuf->size();
+            outbuf->append(head);
+            size -= outbuf->size() - old_size;

Review Comment:
   为什么不直接用size -= head.size()呢



##########
src/brpc/mysql_command.cpp:
##########
@@ -0,0 +1,260 @@
+// Copyright (c) 2015 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_common.h"
+#include "brpc/mysql.h"
+
+namespace brpc {
+
+namespace {
+const uint32_t max_allowed_packet = 67108864;
+const uint32_t max_packet_size = 16777215;
+
+template <class H, class F, class D>
+butil::Status MakePacket(butil::IOBuf* outbuf, const H& head, const F& func, const D& data) {
+    long pkg_len = head.size() + data.size();
+    if (pkg_len > max_allowed_packet) {
+        return butil::Status(
+            EINVAL,
+            "[MakePacket] statement size is too big, maxAllowedPacket = %d, pkg_len = %ld",
+            max_allowed_packet,
+            pkg_len);
+    }
+    uint32_t size, header;
+    uint8_t seq = 0;
+    size_t offset = 0;
+    for (; pkg_len > 0; pkg_len -= max_packet_size, ++seq) {
+        if (pkg_len > max_packet_size) {
+            size = max_packet_size;
+        } else {
+            size = pkg_len;
+        }
+        header = butil::ByteSwapToLE32(size);
+        ((uint8_t*)&header)[3] = seq;
+        outbuf->append(&header, 4);
+        if (seq == 0) {
+            const uint32_t old_size = outbuf->size();
+            outbuf->append(head);
+            size -= outbuf->size() - old_size;
+        }
+        func(outbuf, data, size, offset);
+        offset += size;
+    }
+
+    return butil::Status::OK();
+}
+
+}  // namespace
+
+butil::Status MysqlMakeCommand(butil::IOBuf* outbuf,

Review Comment:
   这个叫MysqlMakeCommandPacket会不会更合适



##########
src/brpc/mysql_command.cpp:
##########
@@ -0,0 +1,260 @@
+// Copyright (c) 2015 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_common.h"
+#include "brpc/mysql.h"
+
+namespace brpc {
+
+namespace {
+const uint32_t max_allowed_packet = 67108864;
+const uint32_t max_packet_size = 16777215;
+
+template <class H, class F, class D>
+butil::Status MakePacket(butil::IOBuf* outbuf, const H& head, const F& func, const D& data) {
+    long pkg_len = head.size() + data.size();
+    if (pkg_len > max_allowed_packet) {
+        return butil::Status(
+            EINVAL,
+            "[MakePacket] statement size is too big, maxAllowedPacket = %d, pkg_len = %ld",
+            max_allowed_packet,
+            pkg_len);
+    }
+    uint32_t size, header;
+    uint8_t seq = 0;
+    size_t offset = 0;
+    for (; pkg_len > 0; pkg_len -= max_packet_size, ++seq) {
+        if (pkg_len > max_packet_size) {
+            size = max_packet_size;
+        } else {
+            size = pkg_len;
+        }
+        header = butil::ByteSwapToLE32(size);
+        ((uint8_t*)&header)[3] = seq;
+        outbuf->append(&header, 4);
+        if (seq == 0) {
+            const uint32_t old_size = outbuf->size();
+            outbuf->append(head);
+            size -= outbuf->size() - old_size;
+        }
+        func(outbuf, data, size, offset);
+        offset += size;
+    }
+
+    return butil::Status::OK();
+}
+
+}  // namespace
+
+butil::Status MysqlMakeCommand(butil::IOBuf* outbuf,
+                               const MysqlCommandType type,
+                               const butil::StringPiece& command) {
+    if (outbuf == NULL || command.size() == 0) {
+        return butil::Status(EINVAL, "[MysqlMakeCommand] Param[outbuf] or [stmt] is NULL");
+    }
+    auto func =
+        [](butil::IOBuf* outbuf, const butil::StringPiece& command, size_t size, size_t offset) {
+            outbuf->append(command.data() + offset, size);
+        };
+    butil::IOBuf head;
+    head.push_back(type);
+    return MakePacket(outbuf, head, func, command);
+}
+
+butil::Status MysqlMakeExecutePacket(butil::IOBuf* outbuf,
+                                     uint32_t stmt_id,
+                                     const butil::IOBuf& edata) {
+    butil::IOBuf head;  // cmd_type + stmt_id + flag + reserved + body_size
+    head.push_back(MYSQL_COM_STMT_EXECUTE);
+    const uint32_t si = butil::ByteSwapToLE32(stmt_id);
+    head.append(&si, 4);
+    head.push_back('\0');
+    head.push_back((char)0x01);
+    head.push_back('\0');
+    head.push_back('\0');
+    head.push_back('\0');
+    auto func = [](butil::IOBuf* outbuf, const butil::IOBuf& data, size_t size, size_t offset) {
+        data.append_to(outbuf, size, offset);
+    };
+    return MakePacket(outbuf, head, func, edata);
+}
+
+butil::Status MysqlMakeExecuteData(MysqlStatementStub* stmt,

Review Comment:
   这个作为MysqlStatementStub的成员函数会不会更合适



##########
src/brpc/mysql_statement_inl.h:
##########
@@ -0,0 +1,58 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_STATEMENT_INL_H
+#define BRPC_MYSQL_STATEMENT_INL_H
+#include <gflags/gflags.h>
+#include "butil/containers/flat_map.h"  // FlatMap
+#include "butil/containers/doubly_buffered_data.h"
+#include "brpc/socket_id.h"
+
+namespace brpc {
+DECLARE_int32(mysql_statment_map_size);
+
+struct MysqlStatementId {
+    uint32_t stmt_id;  // statement id
+    uint64_t version;  // socket's fd version
+};
+
+typedef butil::FlatMap<SocketId, MysqlStatementId> MysqlStatementKVMap;
+typedef butil::DoublyBufferedData<MysqlStatementKVMap> MysqlStatementDBD;
+
+inline size_t my_init_kv(MysqlStatementKVMap& m) {

Review Comment:
   这个不用放在头文件里吧,放在mysql_statement.cpp里就行了



##########
src/brpc/mysql_reply.cpp:
##########
@@ -0,0 +1,1189 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include "brpc/mysql_common.h"
+#include "brpc/mysql_reply.h"
+
+namespace brpc {
+
+#define MY_ALLOC_CHECK(expr)                     \
+    do {                                         \
+        if ((expr) == false) {                   \
+            return PARSE_ERROR_ABSOLUTELY_WRONG; \
+        }                                        \
+    } while (0)
+
+#define MY_PARSE_CHECK(expr)    \
+    do {                        \
+        ParseError rc = (expr); \
+        if (rc != PARSE_OK) {   \
+            return rc;          \
+        }                       \
+    } while (0)
+
+template <class Type>
+inline bool my_alloc_check(butil::Arena* arena, const size_t n, Type*& pointer) {
+    if (pointer == NULL) {
+        pointer = (Type*)arena->allocate(sizeof(Type) * n);
+        if (pointer == NULL) {
+            return false;
+        }
+        for (size_t i = 0; i < n; ++i) {
+            new (pointer + i) Type;
+        }
+    }
+    return true;
+}
+
+template <>
+inline bool my_alloc_check(butil::Arena* arena, const size_t n, char*& pointer) {
+    if (pointer == NULL) {
+        pointer = (char*)arena->allocate(sizeof(char) * n);
+        if (pointer == NULL) {
+            return false;
+        }
+    }
+    return true;
+}
+
+namespace {
+struct MysqlHeader {
+    uint32_t payload_size;
+    uint32_t seq;
+};
+const char* digits01 =
+    "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123"
+    "456789";
+const char* digits10 =
+    "0000000000111111111122222222223333333333444444444455555555556666666666777777777788888888889999"
+    "999999";
+}  // namespace
+
+const char* MysqlRspTypeToString(MysqlRspType type) {
+    switch (type) {
+        case MYSQL_RSP_OK:
+            return "ok";
+        case MYSQL_RSP_ERROR:
+            return "error";
+        case MYSQL_RSP_RESULTSET:
+            return "resultset";
+        case MYSQL_RSP_EOF:
+            return "eof";
+        case MYSQL_RSP_AUTH:
+            return "auth";
+        case MYSQL_RSP_PREPARE_OK:
+            return "prepare_ok";
+        default:
+            return "Unknown Response Type";
+    }
+}
+
+// check if the buf is contain a full package
+inline bool is_full_package(const butil::IOBuf& buf) {
+    uint8_t header[4];
+    const uint8_t* p = (const uint8_t*)buf.fetch(header, sizeof(header));
+    if (p == NULL) {
+        return false;
+    }
+    uint32_t payload_size = mysql_uint3korr(p);
+    if (buf.size() < payload_size + 4) {
+        return false;
+    }
+    return true;
+}
+// if is eof package
+inline bool is_an_eof(const butil::IOBuf& buf) {
+    uint8_t tmp[5];
+    const uint8_t* p = (const uint8_t*)buf.fetch(tmp, sizeof(tmp));
+    if (p == NULL) {
+        return false;
+    }
+    uint8_t type = p[4];
+    if (type == MYSQL_RSP_EOF) {
+        return true;
+    } else {
+        return false;
+    }
+}
+// parse header
+inline bool parse_header(butil::IOBuf& buf, MysqlHeader* value) {
+    if (!is_full_package(buf)) {
+        return false;
+    }
+    {
+        uint8_t tmp[3];
+        buf.cutn(tmp, sizeof(tmp));
+        value->payload_size = mysql_uint3korr(tmp);
+    }
+    {
+        uint8_t tmp;
+        buf.cut1((char*)&tmp);
+        value->seq = tmp;
+    }
+    return true;
+}
+// use this carefully, we depending on parse_header for checking IOBuf contain full package
+inline uint64_t parse_encode_length(butil::IOBuf& buf) {
+    if (buf.size() == 0) {
+        return 0;
+    }
+
+    uint64_t value = 0;
+    uint8_t f = 0;
+    buf.cut1((char*)&f);
+    if (f <= 250) {
+        value = f;
+    } else if (f == 251) {
+        value = 0;
+    } else if (f == 252) {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        value = mysql_uint2korr(tmp);
+    } else if (f == 253) {
+        uint8_t tmp[3];
+        buf.cutn(tmp, sizeof(tmp));
+        value = mysql_uint3korr(tmp);
+    } else if (f == 254) {
+        uint8_t tmp[8];
+        buf.cutn(tmp, sizeof(tmp));
+        value = mysql_uint8korr(tmp);
+    }
+    return value;
+}
+
+ParseError MysqlReply::ConsumePartialIOBuf(butil::IOBuf& buf,
+                                           butil::Arena* arena,
+                                           bool is_auth,
+                                           MysqlStmtType stmt_type,
+                                           bool* more_results) {
+    *more_results = false;
+    if (!is_full_package(buf)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    uint8_t header[4 + 1];  // use the extra byte to judge message type
+    const uint8_t* p = (const uint8_t*)buf.fetch(header, sizeof(header));
+    uint8_t type = (_type == MYSQL_RSP_UNKNOWN) ? p[4] : (uint8_t)_type;
+    if (is_auth && type != 0x00 && type != 0xFF) {
+        _type = MYSQL_RSP_AUTH;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.auth));
+        MY_PARSE_CHECK(_data.auth->Parse(buf, arena));
+        return PARSE_OK;
+    }
+    if (type == 0x00 && (is_auth || stmt_type != MYSQL_NEED_PREPARE)) {
+        _type = MYSQL_RSP_OK;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.ok));
+        MY_PARSE_CHECK(_data.ok->Parse(buf, arena));
+        *more_results = _data.ok->status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+    } else if ((type == 0x00 && stmt_type == MYSQL_NEED_PREPARE) || type == MYSQL_RSP_PREPARE_OK) {
+        _type = MYSQL_RSP_PREPARE_OK;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.prepare_ok));
+        MY_PARSE_CHECK(_data.prepare_ok->Parse(buf, arena));
+    } else if (type == 0xFF) {
+        _type = MYSQL_RSP_ERROR;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.error));
+        MY_PARSE_CHECK(_data.error->Parse(buf, arena));
+    } else if (type == 0xFE) {
+        _type = MYSQL_RSP_EOF;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.eof));
+        MY_PARSE_CHECK(_data.eof->Parse(buf));
+        *more_results = _data.eof->status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+    } else if (type >= 0x01 && type <= 0xFA) {
+        _type = MYSQL_RSP_RESULTSET;
+        MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.result_set));
+        MY_PARSE_CHECK(_data.result_set->Parse(buf, arena, !(stmt_type == MYSQL_NORMAL_STATEMENT)));
+        *more_results = _data.result_set->_eof2.status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+    } else {
+        LOG(ERROR) << "Unknown Response Type "
+                   << "type=" << unsigned(type) << " buf_size=" << buf.size();
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    return PARSE_OK;
+}
+
+void MysqlReply::Print(std::ostream& os) const {
+    if (_type == MYSQL_RSP_AUTH) {
+        const Auth& auth = *_data.auth;
+        os << "\nprotocol:" << (unsigned)auth._protocol << "\nversion:" << auth._version
+           << "\nthread_id:" << auth._thread_id << "\nsalt:" << auth._salt
+           << "\ncapacity:" << auth._capability << "\nlanguage:" << (unsigned)auth._collation
+           << "\nstatus:" << auth._status << "\nextended_capacity:" << auth._extended_capability
+           << "\nauth_plugin_length:" << auth._auth_plugin_length << "\nsalt2:" << auth._salt2
+           << "\nauth_plugin:" << auth._auth_plugin;
+    } else if (_type == MYSQL_RSP_OK) {
+        const Ok& ok = *_data.ok;
+        os << "\naffect_row:" << ok._affect_row << "\nindex:" << ok._index
+           << "\nstatus:" << ok._status << "\nwarning:" << ok._warning << "\nmessage:" << ok._msg;
+    } else if (_type == MYSQL_RSP_ERROR) {
+        const Error& err = *_data.error;
+        os << "\nerrcode:" << err._errcode << "\nstatus:" << err._status
+           << "\nmessage:" << err._msg;
+    } else if (_type == MYSQL_RSP_RESULTSET) {
+        const ResultSet& r = *_data.result_set;
+        os << "\nheader.column_count:" << r._header._column_count;
+        for (uint64_t i = 0; i < r._header._column_count; ++i) {
+            os << "\ncolumn[" << i << "].catalog:" << r._columns[i]._catalog << "\ncolumn[" << i
+               << "].database:" << r._columns[i]._database << "\ncolumn[" << i
+               << "].table:" << r._columns[i]._table << "\ncolumn[" << i
+               << "].origin_table:" << r._columns[i]._origin_table << "\ncolumn[" << i
+               << "].name:" << r._columns[i]._name << "\ncolumn[" << i
+               << "].origin_name:" << r._columns[i]._origin_name << "\ncolumn[" << i
+               << "].charset:" << (uint16_t)r._columns[i]._charset << "\ncolumn[" << i
+               << "].length:" << r._columns[i]._length << "\ncolumn[" << i
+               << "].type:" << (unsigned)r._columns[i]._type << "\ncolumn[" << i
+               << "].flag:" << (unsigned)r._columns[i]._flag << "\ncolumn[" << i
+               << "].decimal:" << (unsigned)r._columns[i]._decimal;
+        }
+        os << "\neof1.warning:" << r._eof1._warning;
+        os << "\neof1.status:" << r._eof1._status;
+        int n = 0;
+        for (const Row* row = r._first->_next; row != r._last->_next; row = row->_next) {
+            os << "\nrow(" << n++ << "):";
+            for (uint64_t j = 0; j < r._header._column_count; ++j) {
+                if (row->field(j).is_nil()) {
+                    os << "NULL\t";
+                    continue;
+                }
+                switch (row->field(j)._type) {
+                    case MYSQL_FIELD_TYPE_NULL:
+                        os << "NULL";
+                        break;
+                    case MYSQL_FIELD_TYPE_TINY:
+                        if (r._columns[j]._flag & MYSQL_UNSIGNED_FLAG) {
+                            os << unsigned(row->field(j).tiny());
+                        } else {
+                            os << signed(row->field(j).stiny());
+                        }
+                        break;
+                    case MYSQL_FIELD_TYPE_SHORT:
+                    case MYSQL_FIELD_TYPE_YEAR:
+                        if (r._columns[j]._flag & MYSQL_UNSIGNED_FLAG) {
+                            os << unsigned(row->field(j).small());
+                        } else {
+                            os << signed(row->field(j).ssmall());
+                        }
+                        break;
+                    case MYSQL_FIELD_TYPE_INT24:
+                    case MYSQL_FIELD_TYPE_LONG:
+                        if (r._columns[j]._flag & MYSQL_UNSIGNED_FLAG) {
+                            os << row->field(j).integer();
+                        } else {
+                            os << row->field(j).sinteger();
+                        }
+                        break;
+                    case MYSQL_FIELD_TYPE_LONGLONG:
+                        if (r._columns[j]._flag & MYSQL_UNSIGNED_FLAG) {
+                            os << row->field(j).bigint();
+                        } else {
+                            os << row->field(j).sbigint();
+                        }
+                        break;
+                    case MYSQL_FIELD_TYPE_FLOAT:
+                        os << row->field(j).float32();
+                        break;
+                    case MYSQL_FIELD_TYPE_DOUBLE:
+                        os << row->field(j).float64();
+                        break;
+                    case MYSQL_FIELD_TYPE_DECIMAL:
+                    case MYSQL_FIELD_TYPE_NEWDECIMAL:
+                    case MYSQL_FIELD_TYPE_VARCHAR:
+                    case MYSQL_FIELD_TYPE_BIT:
+                    case MYSQL_FIELD_TYPE_ENUM:
+                    case MYSQL_FIELD_TYPE_SET:
+                    case MYSQL_FIELD_TYPE_TINY_BLOB:
+                    case MYSQL_FIELD_TYPE_MEDIUM_BLOB:
+                    case MYSQL_FIELD_TYPE_LONG_BLOB:
+                    case MYSQL_FIELD_TYPE_BLOB:
+                    case MYSQL_FIELD_TYPE_VAR_STRING:
+                    case MYSQL_FIELD_TYPE_STRING:
+                    case MYSQL_FIELD_TYPE_GEOMETRY:
+                    case MYSQL_FIELD_TYPE_JSON:
+                    case MYSQL_FIELD_TYPE_TIME:
+                    case MYSQL_FIELD_TYPE_DATE:
+                    case MYSQL_FIELD_TYPE_NEWDATE:
+                    case MYSQL_FIELD_TYPE_TIMESTAMP:
+                    case MYSQL_FIELD_TYPE_DATETIME:
+                        os << row->field(j).string();
+                        break;
+                    default:
+                        os << "Unknown field type";
+                }
+                os << "\t";
+            }
+        }
+        os << "\neof2.warning:" << r._eof2._warning;
+        os << "\neof2.status:" << r._eof2._status;
+    } else if (_type == MYSQL_RSP_EOF) {
+        const Eof& e = *_data.eof;
+        os << "\nwarning:" << e._warning << "\nstatus:" << e._status;
+    } else if (_type == MYSQL_RSP_PREPARE_OK) {
+        const PrepareOk& prep = *_data.prepare_ok;
+        os << "\nstmt_id:" << prep._header._stmt_id
+           << "\ncolumn_count:" << prep._header._column_count
+           << "\nparam_count:" << prep._header._param_count;
+        for (uint16_t i = 0; i < prep._header._param_count; ++i) {
+            os << "\nparam[" << i << "].catalog:" << prep._params[i]._catalog << "\nparam[" << i
+               << "].database:" << prep._params[i]._database << "\nparam[" << i
+               << "].table:" << prep._params[i]._table << "\nparam[" << i
+               << "].origin_table:" << prep._params[i]._origin_table << "\nparam[" << i
+               << "].name:" << prep._params[i]._name << "\nparam[" << i
+               << "].origin_name:" << prep._params[i]._origin_name << "\nparam[" << i
+               << "].charset:" << (uint16_t)prep._params[i]._charset << "\nparam[" << i
+               << "].length:" << prep._params[i]._length << "\nparam[" << i
+               << "].type:" << (unsigned)prep._params[i]._type << "\nparam[" << i
+               << "].flag:" << (unsigned)prep._params[i]._flag << "\nparam[" << i
+               << "].decimal:" << (unsigned)prep._params[i]._decimal;
+        }
+        for (uint16_t i = 0; i < prep._header._column_count; ++i) {
+            os << "\ncolumn[" << i << "].catalog:" << prep._columns[i]._catalog << "\ncolumn[" << i
+               << "].database:" << prep._columns[i]._database << "\ncolumn[" << i
+               << "].table:" << prep._columns[i]._table << "\ncolumn[" << i
+               << "].origin_table:" << prep._columns[i]._origin_table << "\ncolumn[" << i
+               << "].name:" << prep._columns[i]._name << "\ncolumn[" << i
+               << "].origin_name:" << prep._columns[i]._origin_name << "\ncolumn[" << i
+               << "].charset:" << (uint16_t)prep._columns[i]._charset << "\ncolumn[" << i
+               << "].length:" << prep._columns[i]._length << "\ncolumn[" << i
+               << "].type:" << (unsigned)prep._columns[i]._type << "\ncolumn[" << i
+               << "].flag:" << (unsigned)prep._columns[i]._flag << "\ncolumn[" << i
+               << "].decimal:" << (unsigned)prep._columns[i]._decimal;
+        }
+    } else {
+        os << "Unknown response type";
+    }
+}
+
+ParseError MysqlReply::Auth::Parse(butil::IOBuf& buf, butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    const std::string delim(1, 0x00);
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    buf.cut1((char*)&_protocol);
+    {
+        butil::IOBuf version;
+        buf.cut_until(&version, delim);
+        char* d = NULL;
+        MY_ALLOC_CHECK(my_alloc_check(arena, version.size(), d));
+        version.copy_to(d);
+        _version.set(d, version.size());
+    }
+    {
+        uint8_t tmp[4];
+        buf.cutn(tmp, sizeof(tmp));
+        _thread_id = mysql_uint4korr(tmp);
+    }
+    {
+        butil::IOBuf salt;
+        buf.cut_until(&salt, delim);
+        char* d = NULL;
+        MY_ALLOC_CHECK(my_alloc_check(arena, salt.size(), d));
+        salt.copy_to(d);
+        _salt.set(d, salt.size());
+    }
+    {
+        uint8_t tmp[2];
+        buf.cutn(&tmp, sizeof(tmp));
+        _capability = mysql_uint2korr(tmp);
+    }
+    buf.cut1((char*)&_collation);
+    {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        _status = mysql_uint2korr(tmp);
+    }
+    {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        _extended_capability = mysql_uint2korr(tmp);
+    }
+    buf.cut1((char*)&_auth_plugin_length);
+    buf.pop_front(10);
+    {
+        butil::IOBuf salt2;
+        buf.cut_until(&salt2, delim);
+        char* d = NULL;
+        MY_ALLOC_CHECK(my_alloc_check(arena, salt2.size(), d));
+        salt2.copy_to(d);
+        _salt2.set(d, salt2.size());
+    }
+    {
+        char* d = NULL;
+        MY_ALLOC_CHECK(my_alloc_check(arena, _auth_plugin_length, d));
+        buf.cutn(d, _auth_plugin_length);
+        _auth_plugin.set(d, _auth_plugin_length);
+    }
+    buf.clear();  // consume all buf
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::ResultSetHeader::Parse(butil::IOBuf& buf) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    uint64_t old_size, new_size;
+    old_size = buf.size();
+    _column_count = parse_encode_length(buf);
+    new_size = buf.size();
+    if (old_size - new_size < header.payload_size) {
+        _extra_msg = parse_encode_length(buf);
+    } else {
+        _extra_msg = 0;
+    }
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Column::Parse(butil::IOBuf& buf, butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+
+    uint64_t len = parse_encode_length(buf);
+    char* catalog = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, catalog));
+    buf.cutn(catalog, len);
+    _catalog.set(catalog, len);
+
+    len = parse_encode_length(buf);
+    char* database = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, database));
+    buf.cutn(database, len);
+    _database.set(database, len);
+
+    len = parse_encode_length(buf);
+    char* table = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, table));
+    buf.cutn(table, len);
+    _table.set(table, len);
+
+    len = parse_encode_length(buf);
+    char* origin_table = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, origin_table));
+    buf.cutn(origin_table, len);
+    _origin_table.set(origin_table, len);
+
+    len = parse_encode_length(buf);
+    char* name = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, name));
+    buf.cutn(name, len);
+    _name.set(name, len);
+
+    len = parse_encode_length(buf);
+    char* origin_name = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, origin_name));
+    buf.cutn(origin_name, len);
+    _origin_name.set(origin_name, len);
+    buf.pop_front(1);
+    {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        _charset = mysql_uint2korr(tmp);
+    }
+    {
+        uint8_t tmp[4];
+        buf.cutn(tmp, sizeof(tmp));
+        _length = mysql_uint4korr(tmp);
+    }
+    buf.cut1((char*)&_type);
+    {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        _flag = (MysqlFieldFlag)mysql_uint2korr(tmp);
+    }
+    buf.cut1((char*)&_decimal);
+    buf.pop_front(2);
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Ok::Parse(butil::IOBuf& buf, butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+
+    uint64_t old_size, new_size;
+    old_size = buf.size();
+    buf.pop_front(1);
+
+    _affect_row = parse_encode_length(buf);
+    _index = parse_encode_length(buf);
+    buf.cutn(&_status, 2);
+    buf.cutn(&_warning, 2);
+
+    new_size = buf.size();
+    if (old_size - new_size < header.payload_size) {
+        const int64_t len = header.payload_size - (old_size - new_size);
+        char* msg = NULL;
+        MY_ALLOC_CHECK(my_alloc_check(arena, len, msg));
+        buf.cutn(msg, len);
+        _msg.set(msg, len);
+        // buf.pop_front(1);  // Null
+    }
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Eof::Parse(butil::IOBuf& buf) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    buf.pop_front(1);
+    buf.cutn(&_warning, 2);
+    buf.cutn(&_status, 2);
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Error::Parse(butil::IOBuf& buf, butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    buf.pop_front(1);  // 0xFF
+    {
+        uint8_t tmp[2];
+        buf.cutn(tmp, sizeof(tmp));
+        _errcode = mysql_uint2korr(tmp);
+    }
+    buf.pop_front(1);  // '#'
+    // 5 byte server status
+    char* status = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, 5, status));
+    buf.cutn(status, 5);
+    _status.set(status, 5);
+    // error message, Null-Terminated string
+    uint64_t len = header.payload_size - 9;
+    char* msg = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, len, msg));
+    buf.cutn(msg, len);
+    _msg.set(msg, len);
+    // buf.pop_front(1);  // Null
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Row::Parse(butil::IOBuf& buf,
+                                  const MysqlReply::Column* columns,
+                                  uint64_t column_count,
+                                  MysqlReply::Field* fields,
+                                  bool binary,
+                                  butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    MysqlHeader header;
+    if (!parse_header(buf, &header)) {
+        return PARSE_ERROR_NOT_ENOUGH_DATA;
+    }
+    if (!binary) {  // mysql text protocol
+        for (uint64_t i = 0; i < column_count; ++i) {
+            MY_PARSE_CHECK(fields[i].Parse(buf, columns + i, arena));
+        }
+    } else {  // mysql binary protocol
+        uint8_t hdr = 0;
+        buf.cut1((char*)&hdr);
+        if (hdr != 0x00) {
+            return PARSE_ERROR_ABSOLUTELY_WRONG;
+        }
+        // NULL-bitmap, [(column-count + 7 + 2) / 8 bytes]
+        const uint64_t size = ((column_count + 7 + 2) >> 3);
+        uint8_t null_mask[size];
+        for (size_t i = 0; i < sizeof(null_mask); ++i) {
+            null_mask[i] = 0;
+        }
+        buf.cutn(null_mask, size);
+        for (uint64_t i = 0; i < column_count; ++i) {
+            MY_PARSE_CHECK(fields[i].Parse(buf, columns + i, i, column_count, null_mask, arena));
+        }
+    }
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Field::Parse(butil::IOBuf& buf,
+                                    const MysqlReply::Column* column,
+                                    butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    // field type
+    _type = column->_type;
+    // is unsigned flag set
+    _unsigned = column->_flag & MYSQL_UNSIGNED_FLAG;
+    // parse encode length
+    const uint64_t len = parse_encode_length(buf);
+    // is it null?
+    if (len == 0 && !(column->_flag & MYSQL_NOT_NULL_FLAG)) {
+        _is_nil = true;
+        set_parsed();
+        return PARSE_OK;
+    }
+    // field is not null
+    butil::IOBuf str;
+    buf.cutn(&str, len);
+    switch (_type) {
+        case MYSQL_FIELD_TYPE_NULL:
+            _is_nil = true;
+            break;
+        case MYSQL_FIELD_TYPE_TINY:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                _data.tiny = strtoul(str.to_string().c_str(), NULL, 10);
+            } else {
+                _data.stiny = strtol(str.to_string().c_str(), NULL, 10);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_SHORT:
+        case MYSQL_FIELD_TYPE_YEAR:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                _data.small = strtoul(str.to_string().c_str(), NULL, 10);
+            } else {
+                _data.ssmall = strtol(str.to_string().c_str(), NULL, 10);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_INT24:
+        case MYSQL_FIELD_TYPE_LONG:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                _data.integer = strtoul(str.to_string().c_str(), NULL, 10);
+            } else {
+                _data.sinteger = strtol(str.to_string().c_str(), NULL, 10);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_LONGLONG:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                _data.bigint = strtoul(str.to_string().c_str(), NULL, 10);
+            } else {
+                _data.sbigint = strtol(str.to_string().c_str(), NULL, 10);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_FLOAT:
+            _data.float32 = strtof(str.to_string().c_str(), NULL);
+            break;
+        case MYSQL_FIELD_TYPE_DOUBLE:
+            _data.float64 = strtod(str.to_string().c_str(), NULL);
+            break;
+        case MYSQL_FIELD_TYPE_DECIMAL:
+        case MYSQL_FIELD_TYPE_NEWDECIMAL:
+        case MYSQL_FIELD_TYPE_VARCHAR:
+        case MYSQL_FIELD_TYPE_BIT:
+        case MYSQL_FIELD_TYPE_ENUM:
+        case MYSQL_FIELD_TYPE_SET:
+        case MYSQL_FIELD_TYPE_TINY_BLOB:
+        case MYSQL_FIELD_TYPE_MEDIUM_BLOB:
+        case MYSQL_FIELD_TYPE_LONG_BLOB:
+        case MYSQL_FIELD_TYPE_BLOB:
+        case MYSQL_FIELD_TYPE_VAR_STRING:
+        case MYSQL_FIELD_TYPE_STRING:
+        case MYSQL_FIELD_TYPE_GEOMETRY:
+        case MYSQL_FIELD_TYPE_JSON:
+        case MYSQL_FIELD_TYPE_TIME:
+        case MYSQL_FIELD_TYPE_DATE:
+        case MYSQL_FIELD_TYPE_NEWDATE:
+        case MYSQL_FIELD_TYPE_TIMESTAMP:
+        case MYSQL_FIELD_TYPE_DATETIME: {
+            char* d = NULL;
+            MY_ALLOC_CHECK(my_alloc_check(arena, len, d));
+            str.copy_to(d);
+            _data.str.set(d, len);
+        } break;
+        default:
+            LOG(ERROR) << "Unknown field type";
+            set_parsed();
+            return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Field::Parse(butil::IOBuf& buf,
+                                    const MysqlReply::Column* column,
+                                    uint64_t column_index,
+                                    uint64_t column_count,
+                                    const uint8_t* null_mask,
+                                    butil::Arena* arena) {
+    if (is_parsed()) {
+        return PARSE_OK;
+    }
+    // field type
+    _type = column->_type;
+    // is unsigned flag set
+    _unsigned = column->_flag & MYSQL_UNSIGNED_FLAG;
+    // (byte >> bit-pos) % 2 == 1
+    if (((null_mask[(column_index + 2) >> 3] >> ((column_index + 2) & 7)) & 1) == 1) {
+        _is_nil = true;
+        set_parsed();
+        return PARSE_OK;
+    }
+
+    switch (_type) {
+        case MYSQL_FIELD_TYPE_NULL:
+            _is_nil = true;
+            break;
+        case MYSQL_FIELD_TYPE_TINY:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                buf.cut1((char*)&_data.tiny);
+            } else {
+                buf.cut1((char*)&_data.stiny);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_SHORT:
+        case MYSQL_FIELD_TYPE_YEAR:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                uint8_t* p = (uint8_t*)&_data.small;
+                buf.cutn(p, 2);
+                _data.small = mysql_uint2korr(p);
+            } else {
+                uint8_t* p = (uint8_t*)&_data.ssmall;
+                buf.cutn(p, 2);
+                _data.ssmall = (int16_t)mysql_uint2korr(p);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_INT24:
+        case MYSQL_FIELD_TYPE_LONG:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                uint8_t* p = (uint8_t*)&_data.integer;
+                buf.cutn(p, 4);
+                _data.integer = mysql_uint4korr(p);
+            } else {
+                uint8_t* p = (uint8_t*)&_data.sinteger;
+                buf.cutn(p, 4);
+                _data.sinteger = (int32_t)mysql_uint4korr(p);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_LONGLONG:
+            if (column->_flag & MYSQL_UNSIGNED_FLAG) {
+                uint8_t* p = (uint8_t*)&_data.bigint;
+                buf.cutn(p, 8);
+                _data.bigint = mysql_uint8korr(p);
+            } else {
+                uint8_t* p = (uint8_t*)&_data.sbigint;
+                buf.cutn(p, 8);
+                _data.sbigint = (int64_t)mysql_uint8korr(p);
+            }
+            break;
+        case MYSQL_FIELD_TYPE_FLOAT: {
+            uint8_t* p = (uint8_t*)&_data.float32;
+            buf.cutn(p, 4);
+        } break;
+        case MYSQL_FIELD_TYPE_DOUBLE: {
+            uint8_t* p = (uint8_t*)&_data.float64;
+            buf.cutn(p, 8);
+        } break;
+        case MYSQL_FIELD_TYPE_DECIMAL:
+        case MYSQL_FIELD_TYPE_NEWDECIMAL:
+        case MYSQL_FIELD_TYPE_VARCHAR:
+        case MYSQL_FIELD_TYPE_BIT:
+        case MYSQL_FIELD_TYPE_ENUM:
+        case MYSQL_FIELD_TYPE_SET:
+        case MYSQL_FIELD_TYPE_TINY_BLOB:
+        case MYSQL_FIELD_TYPE_MEDIUM_BLOB:
+        case MYSQL_FIELD_TYPE_LONG_BLOB:
+        case MYSQL_FIELD_TYPE_BLOB:
+        case MYSQL_FIELD_TYPE_VAR_STRING:
+        case MYSQL_FIELD_TYPE_STRING:
+        case MYSQL_FIELD_TYPE_GEOMETRY:
+        case MYSQL_FIELD_TYPE_JSON: {
+            const uint64_t len = parse_encode_length(buf);
+            // is it null?
+            if (len == 0 && !(column->_flag & MYSQL_NOT_NULL_FLAG)) {
+                _is_nil = true;
+                set_parsed();
+                return PARSE_OK;
+            }
+            // field is not null
+            char* d = NULL;
+            MY_ALLOC_CHECK(my_alloc_check(arena, len, d));
+            buf.cutn(d, len);
+            _data.str.set(d, len);
+        } break;
+        case MYSQL_FIELD_TYPE_NEWDATE:      // Date YYYY-MM-DD
+        case MYSQL_FIELD_TYPE_DATE:         // Date YYYY-MM-DD
+        case MYSQL_FIELD_TYPE_DATETIME:     // Timestamp YYYY-MM-DD HH:MM:SS[.fractal]
+        case MYSQL_FIELD_TYPE_TIMESTAMP: {  // Timestamp YYYY-MM-DD HH:MM:SS[.fractal]
+            ParseError rc = ParseBinaryDataTime(buf, column, _data.str, arena);
+            if (rc != PARSE_OK) {
+                return rc;
+            }
+        } break;
+        case MYSQL_FIELD_TYPE_TIME: {  // Time [-][H]HH:MM:SS[.fractal]
+            ParseError rc = ParseBinaryTime(buf, column, _data.str, arena);
+            if (rc != PARSE_OK) {
+                return rc;
+            }
+        } break;
+        default:
+            LOG(ERROR) << "Unknown field type";
+            return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    set_parsed();
+    return PARSE_OK;
+}
+
+ParseError MysqlReply::Field::ParseBinaryTime(butil::IOBuf& buf,
+                                              const MysqlReply::Column* column,
+                                              butil::StringPiece& str,
+                                              butil::Arena* arena) {
+
+    const uint64_t len = parse_encode_length(buf);
+    if (len == 0) {
+        _is_nil = true;
+        return PARSE_OK;
+    }
+
+    if (len != 8 && len != 12) {
+        LOG(ERROR) << "invalid TIME packet length " << len;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    uint8_t dstlen;
+    switch (column->_decimal) {
+        case 0x00:
+        case 0x1f:
+            dstlen = 8;
+            break;
+        case 1:
+        case 2:
+        case 3:
+        case 4:
+        case 5:
+        case 6:
+            dstlen = 8 + 1 + column->_decimal;
+            break;
+        default:
+            LOG(ERROR) << "protocol error, illegal decimals value " << column->_decimal;
+            return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    size_t i = 0;
+    char* d = NULL;
+    MY_ALLOC_CHECK(my_alloc_check(arena, dstlen + 2, d));
+    d[dstlen] = '\0';
+    d[dstlen + 1] = '\0';
+    uint32_t day;
+    uint8_t neg, hour, min, sec;
+
+    buf.cut1((char*)&neg);
+    if (neg == 1) {
+        d[i++] = '-';
+    }
+
+    buf.cutn(&day, 4);
+    day = mysql_uint4korr((uint8_t*)&day);
+    buf.cut1((char*)&hour);
+    hour += day * 24;
+    if (hour >= 100) {
+        std::ostringstream os;
+        os << hour;
+        std::string s = os.str();
+        for (const auto& v : s) {
+            d[i++] = v;
+        }
+    } else {
+        d[i++] = digits10[hour];
+        d[i++] = digits01[hour];
+    }
+
+    buf.cut1((char*)&min);
+    buf.cut1((char*)&sec);
+
+    d[i++] = ':';
+    d[i++] = digits10[min];
+    d[i++] = digits01[min];
+    d[i++] = ':';
+    d[i++] = digits10[sec];
+    d[i++] = digits01[sec];
+
+    ParseError rc = ParseMicrosecs(buf, column->_decimal, d + i);
+    if (rc == PARSE_OK) {
+        str.set(d, dstlen + 2);
+    }
+    return rc;
+}
+
+ParseError MysqlReply::Field::ParseBinaryDataTime(butil::IOBuf& buf,

Review Comment:
   DataTime -> DateTime



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] Add Mysql Protocol (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on code in PR #2093:
URL: https://github.com/apache/brpc/pull/2093#discussion_r1267778828


##########
src/brpc/controller.cpp:
##########
@@ -1017,7 +1032,17 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
     _current_call.need_feedback = false;
     _current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
     SocketUniquePtr tmp_sock;
-    if (SingleServer()) {
+    if ((_connection_type & CONNECTION_TYPE_POOLED_AND_SHORT) && 
+        _bind_sock_action == BIND_SOCK_USE) {
+        tmp_sock.reset(_bind_sock.release());
+        if (!tmp_sock || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
+            SetFailed(EHOSTDOWN, "Not connected to bind socket yet, server_id=%" PRIu64,
+                      tmp_sock->id());

Review Comment:
   tmp_sock为NULL,tmp_sock->id()会coredump吧?



##########
src/brpc/controller.cpp:
##########
@@ -1017,7 +1032,17 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
     _current_call.need_feedback = false;
     _current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
     SocketUniquePtr tmp_sock;
-    if (SingleServer()) {
+    if ((_connection_type & CONNECTION_TYPE_POOLED_AND_SHORT) && 
+        _bind_sock_action == BIND_SOCK_USE) {
+        tmp_sock.reset(_bind_sock.release());
+        if (!tmp_sock || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
+            SetFailed(EHOSTDOWN, "Not connected to bind socket yet, server_id=%" PRIu64,
+                      tmp_sock->id());

Review Comment:
   tmp_sock为NULL,tmp_sock->id()会coredump吧?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [brpc] yanglimingcn commented on pull request #2093: Add Mysql Protocol

Posted by GitBox <gi...@apache.org>.
yanglimingcn commented on PR #2093:
URL: https://github.com/apache/brpc/pull/2093#issuecomment-1385095711

   mysql的prepared statement协议的支持。
   遇到的两个问题解决方案如下:
   1、prepared statement首先是在某个连接上创建一个statement id,后续操作可以根据这个statement id向这个连接发送后续的请求。同一个语句在每个连接上的statement id是不同的。在pooled模式下,每次RPC选择的连接是不同的,所以需要记录SocketId和statement id的关系,我们这里称为id_map。但是一个SocketId有可能关闭,然后又重新建立起来,这个新SocketId对应的连接没有这个statement id。id_map没有机制可以做到同步更新,保存的SocketId与statement id关系信息是旧的关系。这样就会产生问题。
   这个问题的修改方式是,给socketid对应的fd一个唯一的版本号,每次通过stmt_id找到socketid的时候对比一下版本号,版本号一致说明fd没有变化,版本号不一致说明fd变化过。
   2、目前还不能支持single模式,唯一的原因就是auth部分,正常auth多个请求会竞争发送auth,后续操作就不需要发送auth了,auth不用等待服务端应答,就可以让其他请求继续发送。但是mysql的auth和这个逻辑不通,mysql的auth首先接收mysql-server发送过来的salt,客户端根据salt加密密码,再把加密后的密码发送给服务端。这样的逻辑如果不等待服务端应答就解锁别的请求,就会出现发送auth的请求落后于正常的请求,导致消息乱序。
   这个目前就先不支持single模式了。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


[GitHub] [brpc] wwbmmm commented on a diff in pull request #2093: Add Mysql Protocol

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2093:
URL: https://github.com/apache/brpc/pull/2093#discussion_r1073125489


##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。

Review Comment:
   现在是不是支持Prepared statement了



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread [线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", "test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const MysqlReply::Columnconst MysqlReply::Row等。
+- 如果只有一条命令则reply为1个,如果为批量操作返回的reply为多个。
+
+目前支持的请求操作有:
+
+```c++
+bool Query(const butil::StringPiece& command);
+```
+
+对应的回复操作:
+
+```c++
+// 返回不同类型的结果
+const MysqlReply::Auth& auth() const;
+const MysqlReply::Ok& ok() const;
+const MysqlReply::Error& error() const;
+const MysqlReply::Eof& eof() const;
+// 对result set结果集的操作
+// get column number
+uint64_t MysqlReply::column_number() const;
+// get one column
+const MysqlReply::Column& MysqlReply::column(const uint64_t index) const;
+// get row number
+uint64_t MysqlReply::row_number() const;
+// get one row
+const MysqlReply::Row& MysqlReply::next() const;
+// 结果集中每个字段的操作
+const MysqlReply::Field& MysqlReply::Row::field(const uint64_t index) const;
+```
+
+# 事务操作
+
+事务可以保证在一个事务中的多个RPC请求最终要么都成功,要么都失败。
+
+```c++
+rpc::Channel channel;

Review Comment:
   这一段初始化Channel的代码可以略去



##########
src/brpc/controller.h:
##########
@@ -815,6 +824,13 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
     // Defined at both sides
     StreamSettings *_remote_stream_settings;
 
+    // controller bind socket action
+    BindSockAction _bind_sock_action;
+    // controller bind sock
+    SocketUniquePtr _bind_sock;
+    // sql prepare statement
+    MysqlStatementStub *_stmt;

Review Comment:
   建议叫_mysql_stmt



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread [线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", "test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const MysqlReply::Columnconst MysqlReply::Row等。
+- 如果只有一条命令则reply为1个,如果为批量操作返回的reply为多个。
+
+目前支持的请求操作有:
+
+```c++
+bool Query(const butil::StringPiece& command);
+```
+
+对应的回复操作:
+
+```c++
+// 返回不同类型的结果
+const MysqlReply::Auth& auth() const;
+const MysqlReply::Ok& ok() const;
+const MysqlReply::Error& error() const;
+const MysqlReply::Eof& eof() const;
+// 对result set结果集的操作
+// get column number
+uint64_t MysqlReply::column_number() const;
+// get one column
+const MysqlReply::Column& MysqlReply::column(const uint64_t index) const;
+// get row number
+uint64_t MysqlReply::row_number() const;
+// get one row
+const MysqlReply::Row& MysqlReply::next() const;
+// 结果集中每个字段的操作
+const MysqlReply::Field& MysqlReply::Row::field(const uint64_t index) const;
+```
+
+# 事务操作
+
+事务可以保证在一个事务中的多个RPC请求最终要么都成功,要么都失败。
+
+```c++
+rpc::Channel channel;
+// Initialize the channel, NULL means using default options.
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.connect_timeout_ms = FLAGS_connect_timeout_ms;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator(
+    FLAGS_user, FLAGS_password, FLAGS_schema, FLAGS_params);
+if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+
+// create transaction
+brpc::MysqlTransactionOptions options;
+options.readonly = FLAGS_readonly;
+options.isolation_level = brpc::MysqlIsolationLevel(FLAGS_isolation_level);
+auto tx(brpc::NewMysqlTransaction(channel, options));
+if (tx == NULL) {
+    LOG(ERROR) << "Fail to create transaction";
+    return false;
+}
+
+brpc::MysqlRequest request(tx.get());
+if (!request.Query(*it)) {
+    LOG(ERROR) << "Fail to add command";
+    tx->rollback();
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (cntl.Failed()) {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    tx->rollback();
+    return false;
+}
+// handle response
+std::cout << response << std::endl;
+bool rc = tx->commit();
+```
+
+# Prepared Statement
+
+Prepared statement对于一个需要执行很多次的SQL语句,它把这个SQL语句注册到mysql-server,避免了每次请求在mysql-server端都去解析这个SQL语句,能得到性能上的提升。
+
+```c++
+rpc::Channel channel;
+// Initialize the channel, NULL means using default options.
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.connect_timeout_ms = FLAGS_connect_timeout_ms;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator(
+    FLAGS_user, FLAGS_password, FLAGS_schema, FLAGS_params);
+if (channel.Init(FLAGS_server.c_str(), FLAGS_port, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+
+auto stmt(brpc::NewMysqlStatement(channel, "select * from tb where name=?"));
+if (stmt == NULL) {
+    LOG(ERROR) << "Fail to create mysql statement";
+    return -1;
+}
+
+brpc::MysqlRequest request(stmt.get());
+if (!request.AddParam("lilei")) {
+    LOG(ERROR) << "Fail to add name param";
+    return NULL;
+}
+
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel->CallMethod(NULL, &cntl, &request, &response, NULL);
+if (cntl.Failed()) {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return NULL;
+}
+
+std::cout << response << std::endl;
+```
+
+
+
+# 性能测试
+
+我在example/mysql_c++目录下面写了两个测试程序,mysql_press.cpp mysqlclient_press.cpp,mysql_go_press.go 一个是使用了brpc框架,一个是使用了的libmysqlclient访问mysql,一个是使用[go-sql-driver](https://github.com/go-sql-driver)/**go-mysql**访问mysql
+
+启动单线程测试
+
+##### brpc框架访问mysql(单线程)
+
+./mysql_press -thread_num=1 -op_type=0 // insert
+
+```
+qps=3071 latency=320
+qps=3156 latency=311
+qps=3166 latency=310
+qps=3151 latency=312
+qps=3093 latency=317
+qps=3146 latency=312
+qps=3139 latency=313
+qps=3114 latency=315
+qps=3055 latency=321
+qps=3135 latency=313
+qps=2611 latency=376
+qps=3072 latency=320
+qps=3026 latency=324
+qps=2792 latency=352
+qps=3181 latency=309
+qps=3181 latency=309
+qps=3197 latency=307
+qps=3024 latency=325
+```
+
+./mysql_press -thread_num=1 -op_type=1
+
+```
+qps=6414 latency=151
+qps=5292 latency=182
+qps=6700 latency=144
+qps=6858 latency=141
+qps=6915 latency=140
+qps=6822 latency=142
+qps=6722 latency=144
+qps=6852 latency=141
+qps=6713 latency=144
+qps=6741 latency=144
+qps=6734 latency=144
+qps=6611 latency=146
+qps=6554 latency=148
+qps=6810 latency=142
+qps=6787 latency=143
+qps=6737 latency=144
+qps=6579 latency=147
+qps=6634 latency=146
+qps=6716 latency=144
+qps=6711 latency=144
+```
+
+./mysql_press -thread_num=1 -op_type=2 // update
+
+```
+qps=3090 latency=318
+qps=3452 latency=284
+qps=3239 latency=303
+qps=3328 latency=295
+qps=3218 latency=305
+qps=3251 latency=302
+qps=2516 latency=391
+qps=2874 latency=342
+qps=3366 latency=292
+qps=3249 latency=302
+qps=3346 latency=294
+qps=3486 latency=282
+qps=3457 latency=284
+qps=3439 latency=286
+qps=3386 latency=290
+qps=3352 latency=293
+qps=3253 latency=302
+qps=3341 latency=294
+```
+
+##### libmysqlclient实现(单线程)
+
+./mysqlclient_press -thread_num=1 -op_type=0 // insert
+
+```
+qps=3166 latency=313
+qps=3157 latency=314
+qps=2941 latency=337
+qps=3270 latency=303
+qps=3305 latency=300
+qps=3445 latency=287
+qps=3455 latency=287
+qps=3449 latency=287
+qps=3486 latency=284
+qps=3551 latency=279
+qps=3517 latency=281
+qps=3283 latency=302
+qps=3353 latency=295
+qps=2564 latency=386
+qps=3243 latency=305
+qps=3333 latency=297
+qps=3598 latency=275
+qps=3714 latency=267
+```
+
+./mysqlclient_press -thread_num=1 -op_type=1
+
+```
+qps=8209 latency=120
+qps=8022 latency=123
+qps=7879 latency=125
+qps=8083 latency=122
+qps=8504 latency=116
+qps=8112 latency=121
+qps=8278 latency=119
+qps=8698 latency=113
+qps=8817 latency=112
+qps=8755 latency=112
+qps=8734 latency=113
+qps=8390 latency=117
+qps=8230 latency=120
+qps=8486 latency=116
+qps=8038 latency=122
+qps=8640 latency=114
+```
+
+./mysqlclient_press -thread_num=1 -op_type=2 // update
+
+```
+qps=3583 latency=276
+qps=3530 latency=280
+qps=3610 latency=274
+qps=3492 latency=283
+qps=3508 latency=282
+qps=3465 latency=286
+qps=3543 latency=279
+qps=3610 latency=274
+qps=3567 latency=278
+qps=3381 latency=293
+qps=3514 latency=282
+qps=3461 latency=286
+qps=3456 latency=286
+qps=3517 latency=281
+qps=3492 latency=284
+```
+
+##### golang访问mysql(单线程)
+
+go run test.go -thread_num=1
+
+```
+qps = 6905 latency = 144
+qps = 6922 latency = 143
+qps = 6931 latency = 143
+qps = 6998 latency = 142
+qps = 6780 latency = 146
+qps = 6980 latency = 142
+qps = 6901 latency = 144
+qps = 6887 latency = 144
+qps = 6943 latency = 143
+qps = 6880 latency = 144
+qps = 6815 latency = 146
+qps = 6089 latency = 163
+qps = 6626 latency = 150
+qps = 6361 latency = 156
+qps = 6783 latency = 146
+qps = 6789 latency = 146
+qps = 6883 latency = 144
+qps = 6795 latency = 146
+qps = 6724 latency = 148
+qps = 6861 latency = 145
+qps = 6878 latency = 144
+qps = 6842 latency = 146
+```
+
+从以上测试结果看来,使用brpc实现的mysql协议和使用libmysqlclient在插入、修改、删除操作上性能是类似的,但是在查询操作看会逊色于libmysqlclient,查询的性能和golang实现的mysql类似。
+
+##### brpc框架访问mysql(50线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true
+
+```
+qps=18843 latency=2656
+qps=22426 latency=2226
+qps=22536 latency=2203
+qps=22560 latency=2193
+qps=22270 latency=2226
+qps=22302 latency=2247
+qps=22147 latency=2225
+qps=22517 latency=2228
+qps=22762 latency=2176
+qps=23061 latency=2162
+qps=23819 latency=2070
+qps=23852 latency=2077
+qps=22682 latency=2214
+qps=22381 latency=2213
+qps=24041 latency=2069
+qps=24562 latency=2022
+qps=24874 latency=2004
+qps=24821 latency=1988
+qps=24209 latency=2073
+qps=21706 latency=2281
+```
+
+##### libmysqlclient实现(50线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true
+
+```
+qps=23656 latency=378
+qps=16190 latency=555
+qps=20136 latency=445
+qps=22238 latency=401
+qps=22229 latency=403
+qps=19109 latency=470
+qps=22569 latency=394
+qps=26250 latency=343
+qps=28208 latency=318
+qps=29649 latency=301
+qps=29874 latency=301
+qps=30033 latency=301
+qps=25911 latency=345
+qps=28048 latency=317
+qps=27398 latency=329
+```
+
+##### golang访问mysql(50协程)
+
+go run ../mysql_go_press.go -thread_num=50
+
+```
+qps = 23660 latency = 2049
+qps = 23198 latency = 2160
+qps = 23765 latency = 2181
+qps = 23323 latency = 2149
+qps = 14833 latency = 2136
+qps = 23822 latency = 2853
+qps = 20389 latency = 2474
+qps = 23290 latency = 2151
+qps = 23526 latency = 2153
+qps = 21426 latency = 2613
+qps = 23339 latency = 2155
+qps = 25623 latency = 2084
+qps = 23048 latency = 2210
+qps = 20694 latency = 2423
+qps = 23705 latency = 2122
+qps = 23445 latency = 2125
+qps = 24368 latency = 2054
+qps = 23027 latency = 2175
+qps = 24307 latency = 2063
+qps = 23227 latency = 2096
+qps = 23646 latency = 2173
+```
+
+以上是启动50并发的查询请求,看上去qps都比较相似,但是libmysqlclient延时明显低。
+
+##### brpc框架访问mysql(100线程)
+
+./mysql_press -thread_num=100 -op_type=1 -use_bthread=true
+
+```
+qps=26428 latency=3764
+qps=26305 latency=3780
+qps=26390 latency=3779
+qps=26278 latency=3787
+qps=26326 latency=3787
+qps=26266 latency=3792
+qps=26394 latency=3773
+qps=26263 latency=3797
+qps=26250 latency=3783
+qps=26362 latency=3782
+qps=26212 latency=3796
+qps=26260 latency=3800
+qps=24666 latency=4035
+qps=25569 latency=3896
+qps=26223 latency=3794
+qps=25538 latency=3890
+qps=20065 latency=4958
+qps=23023 latency=4331
+qps=25808 latency=3875
+```
+
+##### libmysqlclient实现(100线程)
+
+./mysql_press -thread_num=50 -op_type=1 -use_bthread=true

Review Comment:
   50应改成100



##########
src/brpc/mysql.cpp:
##########
@@ -0,0 +1,695 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <google/protobuf/stubs/once.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/wire_format_lite_inl.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/reflection_ops.h>
+#include <google/protobuf/wire_format.h>
+#include "butil/string_printf.h"
+#include "butil/macros.h"
+#include "brpc/controller.h"
+#include "brpc/mysql.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one MysqlResponse");
+
+// Internal implementation detail -- do not call these.
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl();
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto();
+
+namespace {
+
+const ::google::protobuf::Descriptor* MysqlRequest_descriptor_ = NULL;
+const ::google::protobuf::Descriptor* MysqlResponse_descriptor_ = NULL;
+
+}  // namespace
+
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    const ::google::protobuf::FileDescriptor* file =
+        ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
+            "baidu/rpc/mysql_base.proto");
+    GOOGLE_CHECK(file != NULL);
+    MysqlRequest_descriptor_ = file->message_type(0);
+    MysqlResponse_descriptor_ = file->message_type(1);
+}
+
+namespace {
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
+inline void protobuf_AssignDescriptorsOnce() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
+                                       &protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+void protobuf_RegisterTypes(const ::std::string&) {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlRequest_descriptor_, &MysqlRequest::default_instance());
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlResponse_descriptor_, &MysqlResponse::default_instance());
+}
+
+}  // namespace
+
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    delete MysqlRequest::default_instance_;
+    delete MysqlResponse::default_instance_;
+}
+
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl() {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+#if GOOGLE_PROTOBUF_VERSION >= 3002000
+    ::google::protobuf::internal::InitProtobufDefaults();
+#else
+    ::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
+#endif
+    ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
+        "\n\032baidu/rpc/mysql_base.proto\022\tbaidu.rpc\032"
+        " google/protobuf/descriptor.proto\"\016\n\014Mys"
+        "qlRequest\"\017\n\rMysqlResponseB\003\200\001\001",
+        111);
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile("baidu/rpc/mysql_base.proto",
+                                                                      &protobuf_RegisterTypes);
+    MysqlRequest::default_instance_ = new MysqlRequest();
+    MysqlResponse::default_instance_ = new MysqlResponse();
+    MysqlRequest::default_instance_->InitAsDefaultInstance();
+    MysqlResponse::default_instance_->InitAsDefaultInstance();
+    ::google::protobuf::internal::OnShutdown(
+        &protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once);
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once,
+                                       &protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl);
+}
+
+// Force AddDescriptors() to be called at static initialization time.
+struct StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto {
+    StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto() {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+} static_descriptor_initializer_baidu_2frpc_2fmysql_5fbase_2eproto_;
+
+
+// ===================================================================
+
+#ifndef _MSC_VER
+#endif  // !_MSC_VER
+
+butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id) {
+    butil::Status st;
+    // long data
+    for (const auto& i : _long_data) {
+        st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data);
+        if (!st.ok()) {
+            LOG(ERROR) << "make long data header error " << st;
+            return st;
+        }
+    }
+    _long_data.clear();
+    // execute data
+    st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data);
+    if (!st.ok()) {
+        LOG(ERROR) << "make execute header error " << st;
+        return st;
+    }
+    _execute_data.clear();
+    _null_mask.mask.clear();
+    _null_mask.area = butil::IOBuf::INVALID_AREA;
+    _param_types.types.clear();
+    _param_types.area = butil::IOBuf::INVALID_AREA;
+
+    return st;
+}
+
+MysqlRequest::MysqlRequest() : ::google::protobuf::Message() {
+    SharedCtor();
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx) : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+}
+
+MysqlRequest::MysqlRequest(MysqlStatement* stmt) : ::google::protobuf::Message() {
+    SharedCtor();
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt)
+    : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+void MysqlRequest::InitAsDefaultInstance() {}
+
+MysqlRequest::MysqlRequest(const MysqlRequest& from) : ::google::protobuf::Message() {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlRequest::SharedCtor() {
+    _has_error = false;
+    _cached_size_ = 0;
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+    _param_index = 0;
+}
+
+MysqlRequest::~MysqlRequest() {
+    SharedDtor();
+    if (_stmt != NULL) {
+        delete _stmt;
+    }
+    _stmt = NULL;
+}
+
+void MysqlRequest::SharedDtor() {
+    if (this != default_instance_) {
+    }
+}
+
+void MysqlRequest::SetCachedSize(int size) const {
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+}
+const ::google::protobuf::Descriptor* MysqlRequest::descriptor() {
+    protobuf_AssignDescriptorsOnce();
+    return MysqlRequest_descriptor_;
+}
+
+const MysqlRequest& MysqlRequest::default_instance() {
+    if (default_instance_ == NULL) {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+    return *default_instance_;
+}
+
+MysqlRequest* MysqlRequest::default_instance_ = NULL;
+
+MysqlRequest* MysqlRequest::New() const {
+    return new MysqlRequest;
+}
+
+void MysqlRequest::Clear() {
+    _has_error = false;
+    _buf.clear();
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+}
+
+bool MysqlRequest::MergePartialFromCodedStream(::google::protobuf::io::CodedInputStream*) {
+    LOG(WARNING) << "You're not supposed to parse a MysqlRequest";
+    return true;
+}
+
+void MysqlRequest::SerializeWithCachedSizes(::google::protobuf::io::CodedOutputStream*) const {
+    LOG(WARNING) << "You're not supposed to serialize a MysqlRequest";
+}
+
+::google::protobuf::uint8* MysqlRequest::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+    return target;
+}
+
+int MysqlRequest::ByteSize() const {
+    int total_size = _buf.size();
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = total_size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+    return total_size;
+}
+
+void MysqlRequest::MergeFrom(const ::google::protobuf::Message& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    const MysqlRequest* source =
+        ::google::protobuf::internal::dynamic_cast_if_available<const MysqlRequest*>(&from);
+    if (source == NULL) {
+        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+    } else {
+        MergeFrom(*source);
+    }
+}
+
+void MysqlRequest::MergeFrom(const MysqlRequest& from) {
+    // TODO: maybe need to optimize

Review Comment:
   这个没实现会不会有问题?至少报个错防止误用?



##########
src/brpc/socket.cpp:
##########
@@ -1512,7 +1515,7 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
     if (options_in) {
         opt = *options_in;
     }
-    if (data->empty()) {
+    if (data->empty() && !opt.with_auth) {

Review Comment:
   with_auth字段改成auth_flags了,类型也发生了变化



##########
src/brpc/mysql_command.h:
##########
@@ -0,0 +1,92 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_COMMAND_H
+#define BRPC_MYSQL_COMMAND_H
+
+#include <vector>
+#include "butil/iobuf.h"
+#include "butil/status.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+// mysql command types
+enum MysqlCommandType : unsigned char {
+    MYSQL_COM_SLEEP,
+    MYSQL_COM_QUIT,
+    MYSQL_COM_INIT_DB,
+    MYSQL_COM_QUERY,
+    MYSQL_COM_FIELD_LIST,
+    MYSQL_COM_CREATE_DB,
+    MYSQL_COM_DROP_DB,
+    MYSQL_COM_REFRESH,
+    MYSQL_COM_SHUTDOWN,
+    MYSQL_COM_STATISTICS,
+    MYSQL_COM_PROCESS_INFO,
+    MYSQL_COM_CONNECT,
+    MYSQL_COM_PROCESS_KILL,
+    MYSQL_COM_DEBUG,
+    MYSQL_COM_PING,
+    MYSQL_COM_TIME,
+    MYSQL_COM_DELAYED_INSERT,
+    MYSQL_COM_CHANGE_USER,
+    MYSQL_COM_BINLOG_DUMP,
+    MYSQL_COM_TABLE_DUMP,
+    MYSQL_COM_CONNECT_OUT,
+    MYSQL_COM_REGISTER_SLAVE,
+    MYSQL_COM_STMT_PREPARE,
+    MYSQL_COM_STMT_EXECUTE,
+    MYSQL_COM_STMT_SEND_LONG_DATA,
+    MYSQL_COM_STMT_CLOSE,
+    MYSQL_COM_STMT_RESET,
+    MYSQL_COM_SET_OPTION,
+    MYSQL_COM_STMT_FETCH,
+    MYSQL_COM_DAEMON,
+    MYSQL_COM_BINLOG_DUMP_GTID,
+    MYSQL_COM_RESET_CONNECTION,
+};
+
+butil::Status MysqlMakeCommand(butil::IOBuf* outbuf,
+                               const MysqlCommandType type,
+                               const butil::StringPiece& stmt);
+
+// Prepared Statement Protocol
+// an prepared statement has a unique statement id in one connection (in brpc SocketId), an prepared
+// statement can be executed in many connections, so ever connection has a different statement id.
+// In bprc, we can only get a connection in the stage of PackXXXRequest which is behind our building

Review Comment:
   bprc -> brpc



##########
example/mysql_c++/CMakeLists.txt:
##########
@@ -0,0 +1,148 @@
+cmake_minimum_required(VERSION 2.8.10)

Review Comment:
   加上license



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <ya...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* param_cmd) {
+    if (params.empty()) {
+        return true;
+    }
+    const char* delim1 = "&";
+    std::vector<size_t> idx;
+    for (size_t p = params.find(delim1); p != butil::StringPiece::npos;

Review Comment:
   可以使用butil::StringSplitter或butil::SplitString



##########
docs/cn/mysql_client.md:
##########
@@ -0,0 +1,562 @@
+[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/)
+
+**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。
+
+相比使用[libmysqlclient](https://dev.mysql.com/downloads/connector/c/)(官方client)的优势有:
+
+- 线程安全。用户不需要为每个线程建立独立的client。
+- 支持同步、异步、半同步等访问方式,能使用[ParallelChannel等](combo_channel.md)组合访问方式。
+- 支持多种[连接方式](client.md#连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列brpc提供的福利。
+- 明确的返回类型校验,如果使用了不正确的变量接受mysql的数据类型,将抛出异常。
+- 调用mysql标准库会阻塞框架的并发能力,使用本实现将能充分利用brpc框架的并发能力。
+- 使用brpc实现的mysql不会造成pthread的阻塞,使用libmysqlclient会阻塞pthread [线程相关](bthread.md),使用mysql的异步api会使编程变得很复杂。
+# 访问mysql
+
+创建一个访问mysql的Channel:
+
+```c++
+# include <brpc/mysql.h>
+# include <brpc/policy/mysql_authenticator.h>
+# include <brpc/channel.h>
+
+brpc::ChannelOptions options;
+options.protocol = brpc::PROTOCOL_MYSQL;
+options.connection_type = FLAGS_connection_type;
+options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+options.max_retry = FLAGS_max_retry;
+options.auth = new brpc::policy::MysqlAuthenticator("yangliming01", "123456", "test", 
+    "charset=utf8&collation_connection=utf8_unicode_ci");
+if (channel.Init("127.0.0.1", 3306, &options) != 0) {
+    LOG(ERROR) << "Fail to initialize channel";
+    return -1;
+}
+```
+
+向mysql发起命令。
+
+```c++
+// 执行各种mysql命令,可以批量执行命令如:"select * from tab1;select * from tab2"
+std::string command = "show databases"; // select,delete,update,insert,create,drop ...
+brpc::MysqlRequest request;
+if (!request.Query(command)) {
+    LOG(ERROR) << "Fail to add command";
+    return false;
+}
+brpc::MysqlResponse response;
+brpc::Controller cntl;
+channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+if (!cntl.Failed()) {
+    std::cout << response << std::endl;
+} else {
+    LOG(ERROR) << "Fail to access mysql, " << cntl.ErrorText();
+    return false;
+}
+return true;
+```
+
+上述代码的说明:
+
+- 请求类型必须为MysqlRequest,回复类型必须为MysqlResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
+- 调用request.Query()传入要执行的命令,可以批量执行命令,多个命令用分号隔开。
+- 依次调用response.reply(X)弹出操作结果,根据返回类型的不同,选择不同的类型接收,如:MysqlReply::Ok,MysqlReply::Error,const MysqlReply::Columnconst MysqlReply::Row等。

Review Comment:
   Columnconst 中间少了个逗号



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,

Review Comment:
   这几个函数是不是可以在mysql_authenticator.h里声明一下



##########
example/mysql_c++/mysql_cli.cpp:
##########
@@ -0,0 +1,168 @@
+// Copyright (c) 2014 Baidu, Inc.

Review Comment:
   要改成Apache License header,下同



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, &collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt can't be NULL";
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    if (stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt param number " << stmt->param_count()
+                   << " not equal to prepareOk.param_number " << ok.param_count();
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    stmt->SetStatementId(socket->id(), ok.stmt_id());
+    st = stub->PackExecuteCommand(&buf, ok.stmt_id());
+    if (!st.ok()) {
+        LOG(ERROR) << "[MYSQL PACK] make execute header error " << st;
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    buf.cut_into_file_descriptor(socket->fd());
+    pi->count = MYSQL_PREPARED_STATEMENT;
+END_OF_PREPARE:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+}  // namespace
+
+// "Message" = "Response" as we only implement the client for mysql.
+ParseResult ParseMysqlMessage(butil::IOBuf* source,
+                              Socket* socket,
+                              bool /*read_eof*/,
+                              const void* /*arg*/) {
+    if (source->empty()) {
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    PipelinedInfo pi;
+    if (!socket->PopPipelinedInfo(&pi)) {
+        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
+        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+    }
+
+    InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
+    if (msg == NULL) {
+        msg = new InputResponse;
+        socket->reset_parsing_context(msg);
+    }
+
+    MysqlStmtType stmt_type = static_cast<MysqlStmtType>(pi.count);
+    ParseError err = msg->response.ConsumePartialIOBuf(*source, pi.with_auth, stmt_type);
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "[MYSQL PARSE] " << msg->response;
+    }
+    if (err != PARSE_OK) {
+        if (err == PARSE_ERROR_NOT_ENOUGH_DATA) {
+            socket->GivebackPipelinedInfo(pi);
+        }
+        return MakeParseError(err);
+    }
+    if (pi.with_auth) {
+        ParseError err = HandleAuthentication(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to authenticate with Mysql");
+        }
+        DestroyingPtr<InputResponse> auth_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    if (stmt_type == MYSQL_NEED_PREPARE) {
+        // store stmt_id, make execute header.
+        ParseError err = HandlePrepareStatement(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to make parepared statement with Mysql");
+        }
+        DestroyingPtr<InputResponse> prepare_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    msg->id_wait = pi.id_wait;
+    socket->release_parsing_context();
+    return MakeMessage(msg);
+}
+
+void ProcessMysqlResponse(InputMessageBase* msg_base) {
+    const int64_t start_parse_us = butil::cpuwide_time_us();
+    DestroyingPtr<InputResponse> msg(static_cast<InputResponse*>(msg_base));
+
+    const bthread_id_t cid = msg->id_wait;
+    Controller* cntl = NULL;
+    const int rc = bthread_id_lock(cid, (void**)&cntl);
+    if (rc != 0) {
+        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+        return;
+    }
+
+    ControllerPrivateAccessor accessor(cntl);
+    Span* span = accessor.span();
+    if (span) {
+        span->set_base_real_us(msg->base_real_us());
+        span->set_received_us(msg->received_us());
+        span->set_response_size(msg->response.ByteSize());
+        span->set_start_parse_us(start_parse_us);
+    }
+    const int saved_error = cntl->ErrorCode();
+    if (cntl->response() != NULL) {
+        if (cntl->response()->GetDescriptor() != MysqlResponse::descriptor()) {
+            cntl->SetFailed(ERESPONSE, "Must be MysqlResponse");
+        } else {
+            // We work around ParseFrom of pb which is just a placeholder.
+            ((MysqlResponse*)cntl->response())->Swap(&msg->response);
+        }
+    }  // silently ignore the response.
+
+    // Unlocks correlation_id inside. Revert controller's
+    // error code if it version check of `cid' fails
+    msg.reset();  // optional, just release resourse ASAP
+    accessor.OnResponse(cid, saved_error);
+}
+
+void SerializeMysqlRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request) {
+    if (request == NULL) {
+        return cntl->SetFailed(EREQUEST, "request is NULL");
+    }
+    if (request->GetDescriptor() != MysqlRequest::descriptor()) {
+        return cntl->SetFailed(EREQUEST, "The request is not a MysqlRequest");
+    }
+    const MysqlRequest* rr = (const MysqlRequest*)request;
+    // We work around SerializeTo of pb which is just a placeholder.
+    if (!rr->SerializeTo(buf)) {
+        return cntl->SetFailed(EREQUEST, "Fail to serialize MysqlRequest");
+    }
+    // mysql protocol don't use pipelined count to verify the end of a response, so pipelined count
+    // is meanless, but we can use it help us to distinguish mysql reply type. In mysql protocol, we
+    // can't distinguish OK and PreparedOk, so we set pipelined count to 2 to let parse function to
+    // parse PreparedOk reply
+    ControllerPrivateAccessor accessor(cntl);
+    accessor.set_pipelined_count(MYSQL_NORMAL_STATEMENT);
+
+    auto tx = rr->get_tx();
+    if (tx != NULL) {
+        accessor.use_bind_sock(tx->GetSocketId());
+    }
+    auto st = rr->get_stmt();
+    if (st != NULL) {
+        accessor.set_stmt(st);
+        accessor.set_pipelined_count(MYSQL_PREPARED_STATEMENT);
+    }
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "\n[MYSQL REQUEST] " << *rr;
+    }
+}
+
+void PackMysqlRequest(butil::IOBuf* buf,
+                      SocketMessage**,
+                      uint64_t /*correlation_id*/,
+                      const google::protobuf::MethodDescriptor*,
+                      Controller* cntl,
+                      const butil::IOBuf& request,
+                      const Authenticator* auth) {
+    ControllerPrivateAccessor accessor(cntl);
+    if (auth) {
+        const MysqlAuthenticator* my_auth(dynamic_cast<const MysqlAuthenticator*>(auth));
+        if (my_auth == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] there is not MysqlAuthenticator";
+            return;
+        }
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return;
+        }
+        AuthContext* ctx = sock->mutable_auth_context();
+        // std::string params;

Review Comment:
   可以删了吧



##########
example/mysql_c++/mysql_go_press.go:
##########
@@ -0,0 +1,63 @@
+package main

Review Comment:
   加上license header



##########
src/brpc/policy/mysql_auth_hash.h:
##########
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2.0, as
+ * published by the Free Software Foundation.
+ *
+ * This program is also distributed with certain software (including
+ * but not limited to OpenSSL) that is licensed under separate terms,
+ * as designated in a particular file or component or in included license
+ * documentation.  The authors of MySQL hereby grant you an
+ * additional permission to link the program and your derivative works
+ * with the separately licensed software that they have included with
+ * MySQL.
+ *
+ * Without limiting anything contained in the foregoing, this file,
+ * which is part of MySQL Connector/C++, is also subject to the
+ * Universal FOSS Exception, version 1.0, a copy of which can be found at
+ * http://oss.oracle.com/licenses/universal-foss-exception.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License, version 2.0, for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef BRPC_POLICY_MYSQL_AUTH_HASH_H
+#define BRPC_POLICY_MYSQL_AUTH_HASH_H
+
+// #include <mysql/cdk/config.h>
+#include <string>
+
+namespace brpc {
+namespace policy {
+
+std::string mysql_build_mysql41_authentication_response(const std::string& salt_data,
+                                                        const std::string& password);
+
+std::string mysql_build_sha256_authentication_response(const std::string& salt_data,

Review Comment:
   这个哪里用到了



##########
src/brpc/mysql_reply.h:
##########
@@ -0,0 +1,801 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_REPLY_H
+#define BRPC_MYSQL_REPLY_H
+
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/arena.h"
+#include "butil/sys_byteorder.h"
+#include "butil/logging.h"  // LOG()
+#include "brpc/parse_result.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+class CheckParsed {
+public:
+    CheckParsed() : _is_parsed(false) {}
+    bool is_parsed() const {
+        return _is_parsed;
+    }
+    void set_parsed() {
+        _is_parsed = true;
+    }
+
+private:
+    bool _is_parsed;
+};
+
+enum MysqlRspType : uint8_t {
+    MYSQL_RSP_OK = 0x00,
+    MYSQL_RSP_ERROR = 0xFF,
+    MYSQL_RSP_RESULTSET = 0x01,
+    MYSQL_RSP_EOF = 0xFE,
+    MYSQL_RSP_AUTH = 0xFB,        // add for mysql auth
+    MYSQL_RSP_PREPARE_OK = 0xFC,  // add for prepared statement
+    MYSQL_RSP_UNKNOWN = 0xFD,     // add for other case
+};
+
+const char* MysqlRspTypeToString(MysqlRspType);
+
+class MysqlReply {
+public:
+    // Mysql Auth package
+    class Auth : private CheckParsed {
+    public:
+        Auth();
+        uint8_t protocol() const;
+        butil::StringPiece version() const;
+        uint32_t thread_id() const;
+        butil::StringPiece salt() const;
+        uint16_t capability() const;
+        uint8_t collation() const;
+        uint16_t status() const;
+        uint16_t extended_capability() const;
+        uint8_t auth_plugin_length() const;
+        butil::StringPiece salt2() const;
+        butil::StringPiece auth_plugin() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Auth);
+        friend class MysqlReply;
+
+        uint8_t _protocol;
+        butil::StringPiece _version;
+        uint32_t _thread_id;
+        butil::StringPiece _salt;
+        uint16_t _capability;
+        uint8_t _collation;
+        uint16_t _status;
+        uint16_t _extended_capability;
+        uint8_t _auth_plugin_length;
+        butil::StringPiece _salt2;
+        butil::StringPiece _auth_plugin;
+    };
+    // Mysql Prepared Statement Ok
+    class Column;
+    // Mysql Eof package
+    class Eof : private CheckParsed {
+    public:
+        Eof();
+        uint16_t warning() const;
+        uint16_t status() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf);
+
+        DISALLOW_COPY_AND_ASSIGN(Eof);
+        friend class MysqlReply;
+
+        uint16_t _warning;
+        uint16_t _status;
+    };
+    // Mysql PrepareOk package
+    class PrepareOk : private CheckParsed {
+    public:
+        PrepareOk();
+        uint32_t stmt_id() const;
+        uint16_t column_count() const;
+        uint16_t param_count() const;
+        uint16_t warning() const;
+        const Column& param(uint16_t index) const;
+        const Column& column(uint16_t index) const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(PrepareOk);
+        friend class MysqlReply;
+
+        class Header : private CheckParsed {
+        public:
+            Header() : _stmt_id(0), _column_count(0), _param_count(0), _warning(0) {}
+            uint32_t _stmt_id;
+            uint16_t _column_count;
+            uint16_t _param_count;
+            uint16_t _warning;
+            ParseError Parse(butil::IOBuf& buf);
+        };
+        Header _header;
+        Column* _params;
+        Eof _eof1;
+        Column* _columns;
+        Eof _eof2;
+    };
+    // Mysql Ok package
+    class Ok : private CheckParsed {
+    public:
+        Ok();
+        uint64_t affect_row() const;
+        uint64_t index() const;
+        uint16_t status() const;
+        uint16_t warning() const;
+        butil::StringPiece msg() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Ok);
+        friend class MysqlReply;
+
+        uint64_t _affect_row;
+        uint64_t _index;
+        uint16_t _status;
+        uint16_t _warning;
+        butil::StringPiece _msg;
+    };
+    // Mysql Error package
+    class Error : private CheckParsed {
+    public:
+        Error();
+        uint16_t errcode() const;
+        butil::StringPiece status() const;
+        butil::StringPiece msg() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Error);
+        friend class MysqlReply;
+
+        uint16_t _errcode;
+        butil::StringPiece _status;
+        butil::StringPiece _msg;
+    };
+    // Mysql Column
+    class Column : private CheckParsed {
+    public:
+        Column();
+        butil::StringPiece catalog() const;
+        butil::StringPiece database() const;
+        butil::StringPiece table() const;
+        butil::StringPiece origin_table() const;
+        butil::StringPiece name() const;
+        butil::StringPiece origin_name() const;
+        uint16_t charset() const;
+        uint32_t length() const;
+        MysqlFieldType type() const;
+        MysqlFieldFlag flag() const;
+        uint8_t decimal() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Column);
+        friend class MysqlReply;
+
+        butil::StringPiece _catalog;
+        butil::StringPiece _database;
+        butil::StringPiece _table;
+        butil::StringPiece _origin_table;
+        butil::StringPiece _name;
+        butil::StringPiece _origin_name;
+        uint16_t _charset;
+        uint32_t _length;
+        MysqlFieldType _type;
+        MysqlFieldFlag _flag;
+        uint8_t _decimal;
+    };
+    // Mysql Field
+    class Field : private CheckParsed {
+    public:
+        Field();
+        int8_t stiny() const;
+        uint8_t tiny() const;
+        int16_t ssmall() const;
+        uint16_t small() const;
+        int32_t sinteger() const;
+        uint32_t integer() const;
+        int64_t sbigint() const;
+        uint64_t bigint() const;
+        float float32() const;
+        double float64() const;
+        butil::StringPiece string() const;
+        bool is_stiny() const;
+        bool is_tiny() const;
+        bool is_ssmall() const;
+        bool is_small() const;
+        bool is_sinteger() const;
+        bool is_integer() const;
+        bool is_sbigint() const;
+        bool is_bigint() const;
+        bool is_float32() const;
+        bool is_float64() const;
+        bool is_string() const;
+        bool is_nil() const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf, const MysqlReply::Column* column, butil::Arena* arena);
+        ParseError Parse(butil::IOBuf& buf,
+                         const MysqlReply::Column* column,
+                         uint64_t column_index,
+                         uint64_t column_number,
+                         const uint8_t* null_mask,
+                         butil::Arena* arena);
+        ParseError ParseBinaryTime(butil::IOBuf& buf,
+                                   const MysqlReply::Column* column,
+                                   butil::StringPiece& str,
+                                   butil::Arena* arena);
+        ParseError ParseBinaryDataTime(butil::IOBuf& buf,
+                                       const MysqlReply::Column* column,
+                                       butil::StringPiece& str,
+                                       butil::Arena* arena);
+        ParseError ParseMicrosecs(butil::IOBuf& buf, uint8_t decimal, char* d);
+        DISALLOW_COPY_AND_ASSIGN(Field);
+        friend class MysqlReply;
+
+        union {
+            int8_t stiny;
+            uint8_t tiny;
+            int16_t ssmall;
+            uint16_t small;
+            int32_t sinteger;
+            uint32_t integer;
+            int64_t sbigint;
+            uint64_t bigint;
+            float float32;
+            double float64;
+            butil::StringPiece str;
+        } _data = {.str = NULL};
+        MysqlFieldType _type;
+        bool _unsigned;
+        bool _is_nil;
+    };
+    // Mysql Row
+    class Row : private CheckParsed {
+    public:
+        Row();
+        uint64_t field_count() const;
+        const Field& field(const uint64_t index) const;
+
+    private:
+        ParseError Parse(butil::IOBuf& buf,
+                         const Column* columns,
+                         uint64_t column_number,
+                         Field* fields,
+                         bool binary,
+                         butil::Arena* arena);
+
+        DISALLOW_COPY_AND_ASSIGN(Row);
+        friend class MysqlReply;
+
+        Field* _fields;
+        uint64_t _field_count;
+        Row* _next;
+    };
+
+public:
+    MysqlReply();
+    ParseError ConsumePartialIOBuf(butil::IOBuf& buf,
+                                   butil::Arena* arena,
+                                   bool is_auth,
+                                   MysqlStmtType stmt_type,
+                                   bool* more_results);
+    void Swap(MysqlReply& other);
+    void Print(std::ostream& os) const;
+    // response type
+    MysqlRspType type() const;
+    // get auth
+    const Auth& auth() const;
+    const Ok& ok() const;
+    const PrepareOk& prepare_ok() const;
+    const Error& error() const;
+    const Eof& eof() const;
+    // get column number
+    uint64_t column_count() const;
+    // get one column
+    const Column& column(const uint64_t index) const;
+    // get row number
+    uint64_t row_count() const;
+    // get one row
+    const Row& next() const;
+    bool is_auth() const;
+    bool is_ok() const;
+    bool is_prepare_ok() const;
+    bool is_error() const;
+    bool is_eof() const;
+    bool is_resultset() const;
+
+private:
+    // Mysql result set header
+    struct ResultSetHeader : private CheckParsed {
+        ResultSetHeader() : _column_count(0), _extra_msg(0) {}
+        ParseError Parse(butil::IOBuf& buf);
+        uint64_t _column_count;
+        uint64_t _extra_msg;
+
+    private:
+        DISALLOW_COPY_AND_ASSIGN(ResultSetHeader);
+    };
+    // Mysql result set
+    struct ResultSet : private CheckParsed {
+        ResultSet() : _columns(NULL), _row_count(0) {
+            _cur = _first = _last = &_dummy;
+        }
+        ParseError Parse(butil::IOBuf& buf, butil::Arena* arena, bool binary);
+        ResultSetHeader _header;
+        Column* _columns;
+        Eof _eof1;
+        // row list begin
+        Row* _first;
+        Row* _last;
+        Row* _cur;
+        uint64_t _row_count;
+        // row list end
+        Eof _eof2;
+
+    private:
+        DISALLOW_COPY_AND_ASSIGN(ResultSet);
+        Row _dummy;
+    };
+    // member values
+    MysqlRspType _type;
+    union {
+        Auth* auth;
+        ResultSet* result_set;
+        Ok* ok;
+        PrepareOk* prepare_ok;
+        Error* error;
+        Eof* eof;
+        uint64_t padding;  // For swapping, must cover all bytes.
+    } _data;
+
+    DISALLOW_COPY_AND_ASSIGN(MysqlReply);
+};
+
+// mysql reply
+inline MysqlReply::MysqlReply() {
+    _type = MYSQL_RSP_UNKNOWN;
+    _data.padding = 0;
+}
+inline void MysqlReply::Swap(MysqlReply& other) {

Review Comment:
   加个空行,下同



##########
src/brpc/details/controller_private_accessor.h:
##########
@@ -152,6 +152,22 @@ class ControllerPrivateAccessor {
         return *this;
     }
 
+    // Set bind socket action
+    void set_bind_sock_action(BindSockAction action) { _cntl->_bind_sock_action = action; }
+    // Transfer ownership to other
+    void get_bind_sock(SocketUniquePtr* ptr) {
+        _cntl->_bind_sock->ReAddress(ptr);
+    }
+    // Use a external socket
+    void use_bind_sock(SocketId sock_id) { 
+        _cntl->_bind_sock_action = BIND_SOCK_USE;
+        Socket::Address(sock_id, &_cntl->_bind_sock);
+    }
+    // set prepare statement
+    void set_stmt(MysqlStatementStub *stmt) { _cntl->_stmt = stmt; }
+    // get prepare statement
+    MysqlStatementStub* get_stmt() { return _cntl->_stmt; }

Review Comment:
   mysql_stmt()



##########
src/brpc/details/controller_private_accessor.h:
##########
@@ -152,6 +152,22 @@ class ControllerPrivateAccessor {
         return *this;
     }
 
+    // Set bind socket action
+    void set_bind_sock_action(BindSockAction action) { _cntl->_bind_sock_action = action; }
+    // Transfer ownership to other
+    void get_bind_sock(SocketUniquePtr* ptr) {
+        _cntl->_bind_sock->ReAddress(ptr);
+    }
+    // Use a external socket
+    void use_bind_sock(SocketId sock_id) { 
+        _cntl->_bind_sock_action = BIND_SOCK_USE;
+        Socket::Address(sock_id, &_cntl->_bind_sock);
+    }
+    // set prepare statement
+    void set_stmt(MysqlStatementStub *stmt) { _cntl->_stmt = stmt; }

Review Comment:
   set_mysql_stmt()



##########
src/brpc/mysql.h:
##########
@@ -0,0 +1,286 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#ifndef BRPC_MYSQL_H
+#define BRPC_MYSQL_H
+
+#include <string>
+#include <vector>
+#include <google/protobuf/stubs/common.h>
+
+#include <google/protobuf/generated_message_util.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include "google/protobuf/descriptor.pb.h"
+
+#include "butil/iobuf.h"
+#include "butil/strings/string_piece.h"
+#include "butil/arena.h"
+#include "parse_result.h"
+#include "mysql_command.h"
+#include "mysql_reply.h"
+#include "mysql_transaction.h"
+#include "mysql_statement.h"
+
+namespace brpc {
+// Request to mysql.
+// Notice that you can pipeline multiple commands in one request and sent
+// them to ONE mysql-server together.
+// Example:
+//   MysqlRequest request;
+//   request.Query("select * from table");
+//   MysqlResponse response;
+//   channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/);
+//   if (!cntl.Failed()) {
+//       LOG(INFO) << response.reply(0);
+//   }
+
+class MysqlStatementStub {
+public:
+    MysqlStatementStub(MysqlStatement* stmt);
+    MysqlStatement* stmt();
+    butil::IOBuf& execute_data();
+    butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id);
+    // prepare statement null mask
+    struct NullMask {
+        NullMask() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> mask;
+        butil::IOBuf::Area area;
+    };
+    // prepare statement param types
+    struct ParamTypes {
+        ParamTypes() : area(butil::IOBuf::INVALID_AREA) {}
+        std::vector<uint8_t> types;
+        butil::IOBuf::Area area;
+    };
+    // null mask and param types
+    NullMask& null_mask();
+    ParamTypes& param_types();
+    // save long data
+    void save_long_data(uint16_t param_id, const butil::StringPiece& value);
+
+private:
+    MysqlStatement* _stmt;
+    butil::IOBuf _execute_data;
+    NullMask _null_mask;
+    ParamTypes _param_types;
+    // long data
+    struct LongData {
+        uint16_t param_id;
+        butil::IOBuf long_data;
+    };
+    std::vector<LongData> _long_data;
+};
+
+inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : _stmt(stmt) {}
+
+inline MysqlStatement* MysqlStatementStub::stmt() {
+    return _stmt;
+}
+
+inline butil::IOBuf& MysqlStatementStub::execute_data() {
+    return _execute_data;
+}
+
+inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() {
+    return _null_mask;
+}
+
+inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() {
+    return _param_types;
+}
+
+inline void MysqlStatementStub::save_long_data(uint16_t param_id, const butil::StringPiece& value) {
+    LongData d;
+    d.param_id = param_id;
+    d.long_data.append(value.data(), value.size());
+    _long_data.push_back(d);
+}
+
+class MysqlRequest : public ::google::protobuf::Message {
+public:
+    MysqlRequest();
+    MysqlRequest(const MysqlTransaction* tx);
+    MysqlRequest(MysqlStatement* stmt);
+    MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt);
+    virtual ~MysqlRequest();
+    MysqlRequest(const MysqlRequest& from);
+    inline MysqlRequest& operator=(const MysqlRequest& from) {
+        CopyFrom(from);
+        return *this;
+    }
+    void Swap(MysqlRequest* other);
+
+    // Serialize the request into `buf'. Return true on success.
+    bool SerializeTo(butil::IOBuf* buf) const;
+
+    // Protobuf methods.
+    MysqlRequest* New() const;

Review Comment:
   对于protobuf 3.6以上,需要添加一个New(::google::protobuf::Arena* arena)的重载



##########
src/brpc/mysql.cpp:
##########
@@ -0,0 +1,695 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <google/protobuf/stubs/once.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/wire_format_lite_inl.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/reflection_ops.h>
+#include <google/protobuf/wire_format.h>
+#include "butil/string_printf.h"
+#include "butil/macros.h"
+#include "brpc/controller.h"
+#include "brpc/mysql.h"
+#include "brpc/mysql_common.h"
+
+namespace brpc {
+
+DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one MysqlResponse");
+
+// Internal implementation detail -- do not call these.
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl();
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto();
+
+namespace {
+
+const ::google::protobuf::Descriptor* MysqlRequest_descriptor_ = NULL;
+const ::google::protobuf::Descriptor* MysqlResponse_descriptor_ = NULL;
+
+}  // namespace
+
+void protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    const ::google::protobuf::FileDescriptor* file =
+        ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
+            "baidu/rpc/mysql_base.proto");
+    GOOGLE_CHECK(file != NULL);
+    MysqlRequest_descriptor_ = file->message_type(0);
+    MysqlResponse_descriptor_ = file->message_type(1);
+}
+
+namespace {
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
+inline void protobuf_AssignDescriptorsOnce() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
+                                       &protobuf_AssignDesc_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+void protobuf_RegisterTypes(const ::std::string&) {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlRequest_descriptor_, &MysqlRequest::default_instance());
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
+        MysqlResponse_descriptor_, &MysqlResponse::default_instance());
+}
+
+}  // namespace
+
+void protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    delete MysqlRequest::default_instance_;
+    delete MysqlResponse::default_instance_;
+}
+
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl() {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+#if GOOGLE_PROTOBUF_VERSION >= 3002000
+    ::google::protobuf::internal::InitProtobufDefaults();
+#else
+    ::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
+#endif
+    ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
+        "\n\032baidu/rpc/mysql_base.proto\022\tbaidu.rpc\032"
+        " google/protobuf/descriptor.proto\"\016\n\014Mys"
+        "qlRequest\"\017\n\rMysqlResponseB\003\200\001\001",
+        111);
+    ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile("baidu/rpc/mysql_base.proto",
+                                                                      &protobuf_RegisterTypes);
+    MysqlRequest::default_instance_ = new MysqlRequest();
+    MysqlResponse::default_instance_ = new MysqlResponse();
+    MysqlRequest::default_instance_->InitAsDefaultInstance();
+    MysqlResponse::default_instance_->InitAsDefaultInstance();
+    ::google::protobuf::internal::OnShutdown(
+        &protobuf_ShutdownFile_baidu_2frpc_2fmysql_5fbase_2eproto);
+}
+
+GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once);
+void protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto() {
+    ::google::protobuf::GoogleOnceInit(&protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_once,
+                                       &protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto_impl);
+}
+
+// Force AddDescriptors() to be called at static initialization time.
+struct StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto {
+    StaticDescriptorInitializer_baidu_2frpc_2fmysql_5fbase_2eproto() {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+} static_descriptor_initializer_baidu_2frpc_2fmysql_5fbase_2eproto_;
+
+
+// ===================================================================
+
+#ifndef _MSC_VER
+#endif  // !_MSC_VER
+
+butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id) {
+    butil::Status st;
+    // long data
+    for (const auto& i : _long_data) {
+        st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data);
+        if (!st.ok()) {
+            LOG(ERROR) << "make long data header error " << st;
+            return st;
+        }
+    }
+    _long_data.clear();
+    // execute data
+    st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data);
+    if (!st.ok()) {
+        LOG(ERROR) << "make execute header error " << st;
+        return st;
+    }
+    _execute_data.clear();
+    _null_mask.mask.clear();
+    _null_mask.area = butil::IOBuf::INVALID_AREA;
+    _param_types.types.clear();
+    _param_types.area = butil::IOBuf::INVALID_AREA;
+
+    return st;
+}
+
+MysqlRequest::MysqlRequest() : ::google::protobuf::Message() {
+    SharedCtor();
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx) : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+}
+
+MysqlRequest::MysqlRequest(MysqlStatement* stmt) : ::google::protobuf::Message() {
+    SharedCtor();
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt)
+    : ::google::protobuf::Message() {
+    SharedCtor();
+    _tx = tx;
+    _stmt = new MysqlStatementStub(stmt);
+}
+
+void MysqlRequest::InitAsDefaultInstance() {}
+
+MysqlRequest::MysqlRequest(const MysqlRequest& from) : ::google::protobuf::Message() {
+    SharedCtor();
+    MergeFrom(from);
+}
+
+void MysqlRequest::SharedCtor() {
+    _has_error = false;
+    _cached_size_ = 0;
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+    _param_index = 0;
+}
+
+MysqlRequest::~MysqlRequest() {
+    SharedDtor();
+    if (_stmt != NULL) {
+        delete _stmt;
+    }
+    _stmt = NULL;
+}
+
+void MysqlRequest::SharedDtor() {
+    if (this != default_instance_) {
+    }
+}
+
+void MysqlRequest::SetCachedSize(int size) const {
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+}
+const ::google::protobuf::Descriptor* MysqlRequest::descriptor() {
+    protobuf_AssignDescriptorsOnce();
+    return MysqlRequest_descriptor_;
+}
+
+const MysqlRequest& MysqlRequest::default_instance() {
+    if (default_instance_ == NULL) {
+        protobuf_AddDesc_baidu_2frpc_2fmysql_5fbase_2eproto();
+    }
+    return *default_instance_;
+}
+
+MysqlRequest* MysqlRequest::default_instance_ = NULL;
+
+MysqlRequest* MysqlRequest::New() const {
+    return new MysqlRequest;
+}
+
+void MysqlRequest::Clear() {
+    _has_error = false;
+    _buf.clear();
+    _has_command = false;
+    _tx = NULL;
+    _stmt = NULL;
+}
+
+bool MysqlRequest::MergePartialFromCodedStream(::google::protobuf::io::CodedInputStream*) {
+    LOG(WARNING) << "You're not supposed to parse a MysqlRequest";
+    return true;
+}
+
+void MysqlRequest::SerializeWithCachedSizes(::google::protobuf::io::CodedOutputStream*) const {
+    LOG(WARNING) << "You're not supposed to serialize a MysqlRequest";
+}
+
+::google::protobuf::uint8* MysqlRequest::SerializeWithCachedSizesToArray(
+    ::google::protobuf::uint8* target) const {
+    return target;
+}
+
+int MysqlRequest::ByteSize() const {
+    int total_size = _buf.size();
+    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
+    _cached_size_ = total_size;
+    GOOGLE_SAFE_CONCURRENT_WRITES_END();
+    return total_size;
+}
+
+void MysqlRequest::MergeFrom(const ::google::protobuf::Message& from) {
+    GOOGLE_CHECK_NE(&from, this);
+    const MysqlRequest* source =
+        ::google::protobuf::internal::dynamic_cast_if_available<const MysqlRequest*>(&from);
+    if (source == NULL) {
+        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
+    } else {
+        MergeFrom(*source);
+    }
+}
+
+void MysqlRequest::MergeFrom(const MysqlRequest& from) {
+    // TODO: maybe need to optimize
+    // GOOGLE_CHECK_NE(&from, this);
+    // const int header_size = 4;
+    // const uint32_t size_l = from._buf.size() - header_size - 1;  // payload - type
+    // const uint32_t size_r = _buf.size() - header_size + 1;       // payload + seqno
+    // const uint32_t payload_size = butil::ByteSwapToLE32(size_l + size_r);
+    // if (payload_size > mysql_max_package_size) {
+    //     CHECK(false)
+    //         << "[MysqlRequest::MergeFrom] statement size is too big, merge from do nothing";
+    //     return;
+    // }
+    // butil::IOBuf buf;
+    // butil::IOBuf result;
+    // _has_error = _has_error || from._has_error;
+    // buf.append(from._buf);
+    // buf.pop_front(header_size + 1);
+    // _buf.pop_front(header_size - 1);
+    // result.append(&payload_size, 3);
+    // result.append(_buf);
+    // result.append(buf);
+    // _buf = result;
+    // _has_command = _has_command || from._has_command;
+}
+
+void MysqlRequest::CopyFrom(const ::google::protobuf::Message& from) {
+    if (&from == this)
+        return;
+    Clear();
+    MergeFrom(from);
+}
+
+void MysqlRequest::CopyFrom(const MysqlRequest& from) {
+    if (&from == this)
+        return;
+    Clear();
+    MergeFrom(from);
+}
+
+void MysqlRequest::Swap(MysqlRequest* other) {
+    if (other != this) {
+        _buf.swap(other->_buf);
+        std::swap(_has_error, other->_has_error);
+        std::swap(_cached_size_, other->_cached_size_);
+        std::swap(_has_command, other->_has_command);
+    }
+}
+
+bool MysqlRequest::SerializeTo(butil::IOBuf* buf) const {
+    if (_has_error) {
+        LOG(ERROR) << "Reject serialization due to error in CommandXXX[V]";
+        return false;
+    }
+    *buf = _buf;
+    return true;
+}
+
+::google::protobuf::Metadata MysqlRequest::GetMetadata() const {
+    protobuf_AssignDescriptorsOnce();
+    ::google::protobuf::Metadata metadata;
+    metadata.descriptor = MysqlRequest_descriptor_;
+    metadata.reflection = NULL;
+    return metadata;
+}
+
+bool MysqlRequest::Query(const butil::StringPiece& command) {
+    if (_has_error) {
+        return false;
+    }
+
+    if (_has_command) {
+        return false;
+    }
+
+    const butil::Status st = MysqlMakeCommand(&_buf, MYSQL_COM_QUERY, command);
+    if (st.ok()) {
+        _has_command = true;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+
+bool MysqlRequest::AddParam(int8_t p) {
+    if (_has_error) {
+        return false;
+    }
+    const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_TINY);
+    if (st.ok()) {
+        ++_param_index;
+        return true;
+    } else {
+        CHECK(st.ok()) << st;
+        _has_error = true;
+        return false;
+    }
+}
+bool MysqlRequest::AddParam(uint8_t p) {

Review Comment:
   加个空行吧,下同



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, &collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {

Review Comment:
   下面已经有stmt->param_count() != ok.param_count()
   这里不用重复判断



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, &collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());

Review Comment:
   没有判断返回值,可能存在写入失败的可能性
   另外这里用Socket::Write是不是更合适?



##########
src/brpc/policy/mysql_protocol.cpp:
##########
@@ -0,0 +1,416 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Authors: Yang,Liming (yangliming01@baidu.com)
+
+#include <google/protobuf/descriptor.h>  // MethodDescriptor
+#include <google/protobuf/message.h>     // Message
+#include <gflags/gflags.h>
+#include <sstream>
+#include "butil/logging.h"  // LOG()
+#include "butil/time.h"
+#include "butil/iobuf.h"  // butil::IOBuf
+#include "butil/sys_byteorder.h"
+#include "brpc/controller.h"  // Controller
+#include "brpc/details/controller_private_accessor.h"
+#include "brpc/socket.h"  // Socket
+#include "brpc/server.h"  // Server
+#include "brpc/details/server_private_accessor.h"
+#include "brpc/span.h"
+#include "brpc/mysql.h"
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_protocol.h"
+
+namespace brpc {
+
+DECLARE_bool(enable_rpcz);
+
+namespace policy {
+
+DEFINE_bool(mysql_verbose, false, "[DEBUG] Print EVERY mysql request/response");
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,
+                             std::string* user,
+                             std::string* password,
+                             std::string* schema,
+                             std::string* collation);
+void MysqlParseParams(const butil::StringPiece& raw, std::string* params);
+// pack mysql authentication_data
+int MysqlPackAuthenticator(const MysqlReply::Auth& auth,
+                           const butil::StringPiece& user,
+                           const butil::StringPiece& password,
+                           const butil::StringPiece& schema,
+                           const butil::StringPiece& collation,
+                           std::string* auth_cmd);
+int MysqlPackParams(const butil::StringPiece& params, std::string* param_cmd);
+
+namespace {
+// I really don't want to add a variable in controller, so I use AuthContext group to mark auth
+// step.
+const char* auth_step[] = {"AUTH_OK", "PARAMS_OK"};
+
+struct InputResponse : public InputMessageBase {
+    bthread_id_t id_wait;
+    MysqlResponse response;
+
+    // @InputMessageBase
+    void DestroyImpl() {
+        delete this;
+    }
+};
+
+bool PackRequest(butil::IOBuf* buf,
+                 ControllerPrivateAccessor& accessor,
+                 const butil::IOBuf& request) {
+    if (accessor.pipelined_count() == MYSQL_PREPARED_STATEMENT) {
+        Socket* sock = accessor.get_sending_socket();
+        if (sock == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get sending socket with NULL";
+            return false;
+        }
+        auto stub = accessor.get_stmt();
+        if (stub == NULL) {
+            LOG(ERROR) << "[MYSQL PACK] get prepare statement with NULL";
+            return false;
+        }
+        uint32_t stmt_id;
+        // if can't found stmt_id in this socket, create prepared statement on it, store user
+        // request.
+        if ((stmt_id = stub->stmt()->StatementId(sock->id())) == 0) {
+            butil::IOBuf b;
+            butil::Status st = MysqlMakeCommand(&b, MYSQL_COM_STMT_PREPARE, stub->stmt()->str());
+            if (!st.ok()) {
+                LOG(ERROR) << "[MYSQL PACK] make prepare statement error " << st;
+                return false;
+            }
+            accessor.set_pipelined_count(MYSQL_NEED_PREPARE);
+            buf->append(b);
+            return true;
+        }
+        // else pack execute header with stmt_id
+        butil::Status st = stub->PackExecuteCommand(buf, stmt_id);
+        if (!st.ok()) {
+            LOG(ERROR) << "write execute data error " << st;
+            return false;
+        }
+        return true;
+    }
+    buf->append(request);
+    return true;
+}
+
+ParseError HandleAuthentication(const InputResponse* msg, const Socket* socket, PipelinedInfo* pi) {
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+
+    ParseError parseCode = PARSE_OK;
+    const AuthContext* ctx = socket->auth_context();
+    if (ctx == NULL) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] auth context is null";
+        goto END_OF_AUTH;
+    }
+    if (msg->response.reply(0).is_auth()) {
+        std::string user, password, schema, collation, auth_cmd;
+        const MysqlReply& reply = msg->response.reply(0);
+        MysqlParseAuthenticator(ctx->user(), &user, &password, &schema, &collation);
+        if (MysqlPackAuthenticator(reply.auth(), user, password, schema, collation, &auth_cmd) ==
+            0) {
+            butil::IOBuf buf;
+            buf.append(auth_cmd);
+            buf.cut_into_file_descriptor(socket->fd());
+            const_cast<AuthContext*>(ctx)->set_group(auth_step[0]);
+        } else {
+            parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+            LOG(ERROR) << "[MYSQL PARSE] wrong pack authentication data";
+        }
+    } else if (msg->response.reply_size() > 0) {
+        for (size_t i = 0; i < msg->response.reply_size(); ++i) {
+            if (!msg->response.reply(i).is_ok()) {
+                LOG(ERROR) << "[MYSQL PARSE] auth failed " << msg->response;
+                parseCode = PARSE_ERROR_NO_RESOURCE;
+                goto END_OF_AUTH;
+            }
+        }
+        std::string params, params_cmd;
+        MysqlParseParams(ctx->user(), &params);
+        if (ctx->group() == auth_step[0] && !params.empty()) {
+            if (MysqlPackParams(params, &params_cmd) == 0) {
+                butil::IOBuf buf;
+                buf.append(params_cmd);
+                buf.cut_into_file_descriptor(socket->fd());
+                const_cast<AuthContext*>(ctx)->set_group(auth_step[1]);
+            } else {
+                parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+                LOG(ERROR) << "[MYSQL PARSE] wrong pack params data";
+            }
+        } else {
+            butil::IOBuf raw_req;
+            raw_req.append(ctx->starter());
+            raw_req.cut_into_file_descriptor(socket->fd());
+            pi->with_auth = false;
+        }
+    } else {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] wrong authentication step";
+    }
+
+END_OF_AUTH:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+ParseError HandlePrepareStatement(const InputResponse* msg,
+                                  const Socket* socket,
+                                  PipelinedInfo* pi) {
+    if (!msg->response.reply(0).is_prepare_ok()) {
+        LOG(ERROR) << "[MYSQL PARSE] response is not prepare ok, " << msg->response;
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    const MysqlReply::PrepareOk& ok = msg->response.reply(0).prepare_ok();
+    const bthread_id_t cid = pi->id_wait;
+    Controller* cntl = NULL;
+    if (bthread_id_lock(cid, (void**)&cntl) != 0) {
+        LOG(ERROR) << "[MYSQL PARSE] fail to lock controller";
+        return PARSE_ERROR_ABSOLUTELY_WRONG;
+    }
+    ParseError parseCode = PARSE_OK;
+    butil::IOBuf buf;
+    butil::Status st;
+    auto stub = ControllerPrivateAccessor(cntl).get_stmt();
+    auto stmt = stub->stmt();
+    if (stmt == NULL || stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt can't be NULL";
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    if (stmt->param_count() != ok.param_count()) {
+        LOG(ERROR) << "[MYSQL PACK] stmt param number " << stmt->param_count()
+                   << " not equal to prepareOk.param_number " << ok.param_count();
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    stmt->SetStatementId(socket->id(), ok.stmt_id());
+    st = stub->PackExecuteCommand(&buf, ok.stmt_id());
+    if (!st.ok()) {
+        LOG(ERROR) << "[MYSQL PACK] make execute header error " << st;
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        goto END_OF_PREPARE;
+    }
+    buf.cut_into_file_descriptor(socket->fd());
+    pi->count = MYSQL_PREPARED_STATEMENT;
+END_OF_PREPARE:
+    if (bthread_id_unlock(cid) != 0) {
+        parseCode = PARSE_ERROR_ABSOLUTELY_WRONG;
+        LOG(ERROR) << "[MYSQL PARSE] fail to unlock controller";
+    }
+    return parseCode;
+}
+
+}  // namespace
+
+// "Message" = "Response" as we only implement the client for mysql.
+ParseResult ParseMysqlMessage(butil::IOBuf* source,
+                              Socket* socket,
+                              bool /*read_eof*/,
+                              const void* /*arg*/) {
+    if (source->empty()) {
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+
+    PipelinedInfo pi;
+    if (!socket->PopPipelinedInfo(&pi)) {
+        LOG(WARNING) << "No corresponding PipelinedInfo in socket";
+        return MakeParseError(PARSE_ERROR_TRY_OTHERS);
+    }
+
+    InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
+    if (msg == NULL) {
+        msg = new InputResponse;
+        socket->reset_parsing_context(msg);
+    }
+
+    MysqlStmtType stmt_type = static_cast<MysqlStmtType>(pi.count);
+    ParseError err = msg->response.ConsumePartialIOBuf(*source, pi.with_auth, stmt_type);
+    if (FLAGS_mysql_verbose) {
+        LOG(INFO) << "[MYSQL PARSE] " << msg->response;
+    }
+    if (err != PARSE_OK) {
+        if (err == PARSE_ERROR_NOT_ENOUGH_DATA) {
+            socket->GivebackPipelinedInfo(pi);
+        }
+        return MakeParseError(err);
+    }
+    if (pi.with_auth) {
+        ParseError err = HandleAuthentication(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to authenticate with Mysql");
+        }
+        DestroyingPtr<InputResponse> auth_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    if (stmt_type == MYSQL_NEED_PREPARE) {
+        // store stmt_id, make execute header.
+        ParseError err = HandlePrepareStatement(msg, socket, &pi);
+        if (err != PARSE_OK) {
+            return MakeParseError(err, "Fail to make parepared statement with Mysql");
+        }
+        DestroyingPtr<InputResponse> prepare_msg =
+            static_cast<InputResponse*>(socket->release_parsing_context());
+        socket->GivebackPipelinedInfo(pi);
+        return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
+    }
+    msg->id_wait = pi.id_wait;
+    socket->release_parsing_context();
+    return MakeMessage(msg);
+}
+
+void ProcessMysqlResponse(InputMessageBase* msg_base) {
+    const int64_t start_parse_us = butil::cpuwide_time_us();
+    DestroyingPtr<InputResponse> msg(static_cast<InputResponse*>(msg_base));
+
+    const bthread_id_t cid = msg->id_wait;
+    Controller* cntl = NULL;
+    const int rc = bthread_id_lock(cid, (void**)&cntl);
+    if (rc != 0) {
+        LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
+            << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
+        return;
+    }
+
+    ControllerPrivateAccessor accessor(cntl);
+    Span* span = accessor.span();
+    if (span) {
+        span->set_base_real_us(msg->base_real_us());
+        span->set_received_us(msg->received_us());
+        span->set_response_size(msg->response.ByteSize());
+        span->set_start_parse_us(start_parse_us);
+    }
+    const int saved_error = cntl->ErrorCode();
+    if (cntl->response() != NULL) {
+        if (cntl->response()->GetDescriptor() != MysqlResponse::descriptor()) {
+            cntl->SetFailed(ERESPONSE, "Must be MysqlResponse");
+        } else {
+            // We work around ParseFrom of pb which is just a placeholder.
+            ((MysqlResponse*)cntl->response())->Swap(&msg->response);
+        }
+    }  // silently ignore the response.
+
+    // Unlocks correlation_id inside. Revert controller's
+    // error code if it version check of `cid' fails
+    msg.reset();  // optional, just release resourse ASAP
+    accessor.OnResponse(cid, saved_error);
+}
+
+void SerializeMysqlRequest(butil::IOBuf* buf,
+                           Controller* cntl,
+                           const google::protobuf::Message* request) {
+    if (request == NULL) {
+        return cntl->SetFailed(EREQUEST, "request is NULL");
+    }
+    if (request->GetDescriptor() != MysqlRequest::descriptor()) {
+        return cntl->SetFailed(EREQUEST, "The request is not a MysqlRequest");
+    }
+    const MysqlRequest* rr = (const MysqlRequest*)request;
+    // We work around SerializeTo of pb which is just a placeholder.
+    if (!rr->SerializeTo(buf)) {
+        return cntl->SetFailed(EREQUEST, "Fail to serialize MysqlRequest");
+    }
+    // mysql protocol don't use pipelined count to verify the end of a response, so pipelined count
+    // is meanless, but we can use it help us to distinguish mysql reply type. In mysql protocol, we
+    // can't distinguish OK and PreparedOk, so we set pipelined count to 2 to let parse function to

Review Comment:
   这么用pipeline_count感觉比较奇怪,是不是放在auth_flags里更合适一些



##########
src/brpc/policy/mysql_auth_hash.cpp:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2.0, as

Review Comment:
   这个License是GPL,没法用,看能否自己实现一个



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <ya...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* param_cmd) {
+    if (params.empty()) {
+        return true;
+    }
+    const char* delim1 = "&";
+    std::vector<size_t> idx;
+    for (size_t p = params.find(delim1); p != butil::StringPiece::npos;
+         p = params.find(delim1, p + 1)) {
+        idx.push_back(p);
+    }
+
+    const char* delim2 = "=";
+    std::stringstream ss;
+    for (size_t i = 0; i < idx.size() + 1; ++i) {
+        size_t pos = (i > 0) ? idx[i - 1] + 1 : 0;
+        size_t len = (i < idx.size()) ? idx[i] - pos : params.size() - pos;
+        butil::StringPiece raw(params.data() + pos, len);
+        const size_t p = raw.find(delim2);
+        if (p != butil::StringPiece::npos) {
+            butil::StringPiece k(raw.data(), p);
+            butil::StringPiece v(raw.data() + p + 1, raw.size() - p - 1);
+            if (k == "charset") {
+                ss << "SET NAMES " << v << ";";
+            } else {
+                ss << "SET " << k << "=" << v << ";";
+            }
+        }
+    }
+    *param_cmd = ss.str();
+    return true;
+}
+};  // namespace
+
+// user + "\t" + password + "\t" + schema + "\t" + collation + "\t" + param
+bool MysqlAuthenticator::SerializeToString(std::string* str) const {
+    std::stringstream ss;
+    ss << _user << auth_param_delim;
+    ss << _passwd << auth_param_delim;
+    ss << _schema << auth_param_delim;
+    ss << _collation << auth_param_delim;
+    std::string param_cmd;
+    if (MysqlHandleParams(_params, &param_cmd)) {
+        ss << param_cmd;
+    } else {
+        LOG(ERROR) << "handle mysql authentication params failed, ignore it";
+        return false;
+    }
+    *str = ss.str();
+    return true;
+}
+
+void MysqlParseAuthenticator(const butil::StringPiece& raw,

Review Comment:
   是不是可以实现成MysqlAuthenticator::ParseFromString 这样更对称一些



##########
src/brpc/policy/mysql_authenticator.cpp:
##########
@@ -0,0 +1,176 @@
+// Copyright (c) 2019 Baidu, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Author(s): Yang,Liming <ya...@baidu.com>
+
+#include <vector>
+#include "brpc/policy/mysql_authenticator.h"
+#include "brpc/policy/mysql_auth_hash.h"
+#include "brpc/mysql_command.h"
+#include "brpc/mysql_reply.h"
+#include "brpc/mysql_common.h"
+#include "butil/base64.h"
+#include "butil/iobuf.h"
+#include "butil/logging.h"  // LOG()
+#include "butil/sys_byteorder.h"
+
+namespace brpc {
+namespace policy {
+
+namespace {
+const butil::StringPiece mysql_native_password("mysql_native_password");
+const char* auth_param_delim = "\t";
+bool MysqlHandleParams(const butil::StringPiece& params, std::string* param_cmd) {

Review Comment:
   加个空行



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org