You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ok...@apache.org on 2019/05/09 10:04:51 UTC
[trafficserver] branch master updated: Rewrite SocksProxy based on
states
This is an automated email from the ASF dual-hosted git repository.
oknet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 3632f96 Rewrite SocksProxy based on states
3632f96 is described below
commit 3632f969e83635cce2ee223ec14485fb90cb4c1b
Author: Oknet Xu <xu...@skyguard.com.cn>
AuthorDate: Wed May 8 17:35:24 2019 +0800
Rewrite SocksProxy based on states
---
src/traffic_server/SocksProxy.cc | 607 ++++++++++++++++++++++++++-------------
1 file changed, 404 insertions(+), 203 deletions(-)
diff --git a/src/traffic_server/SocksProxy.cc b/src/traffic_server/SocksProxy.cc
index 824e395..8700e37 100644
--- a/src/traffic_server/SocksProxy.cc
+++ b/src/traffic_server/SocksProxy.cc
@@ -42,9 +42,53 @@ static RecRawStatBlock *socksproxy_stat_block;
#define SOCKSPROXY_INC_STAT(x) RecIncrRawStat(socksproxy_stat_block, mutex->thread_holding, x)
+struct SocksProxy;
+typedef int (SocksProxy::*SocksProxyHandler)(int event, void *data);
+
struct SocksProxy : public Continuation {
using EventHandler = int (SocksProxy::*)(int, void *);
+ /* SocksProxy States:
+ *
+ *
+ * NET_EVENT_ACCEPT
+ * SOCKS_INIT ----------------------> SOCKS_ACCEPT
+ * |
+ * |
+ * +------------------------------+--------------------+
+ * | | |
+ * | | |
+ * (Bad Ver) (Socks v5) (Socks v4)
+ * | | |
+ * | | |
+ * | AUTH_DONE |
+ * | | |
+ * | V V
+ * | (CMD = CONNECT && Port = http_port)
+ * | |
+ * | |
+ * | +-------(Yes)------+-------(No)-------------+
+ * | | |
+ * | | V
+ * | | (Type of Target addr)
+ * | | | |
+ * | | | |
+ * | | is IPv4 not IPv4
+ * | | | |
+ * | | | |
+ * | V V |
+ * | HTTP_REQ SERVER_TUNNEL |
+ * | | | |
+ * | | (connect_re) |
+ * | | | |
+ * V V NET_EVENT_OPEN | |
+ * SOCKS_ERROR --------> ALL_DONE <-------------------------------+ |
+ * A | |
+ * | | |
+ * | NET_EVENT_OPEN_FAILED | |
+ * +------------- RESP_TO_CLIENT <----------------------------+ <---------+
+ *
+ */
enum {
SOCKS_INIT = 1,
SOCKS_ACCEPT,
@@ -58,9 +102,19 @@ struct SocksProxy : public Continuation {
~SocksProxy() override {}
+ int acceptEvent(int event, void *data);
int mainEvent(int event, void *data);
- int setupHttpRequest(unsigned char *p);
+ int state_read_client_request(int event, void *data);
+ int state_read_socks4_client_request(int event, void *data);
+ int state_read_socks5_client_auth_methods(int event, void *data);
+ int state_send_socks5_auth_method(int event, void *data);
+ int state_read_socks5_client_request(int event, void *data);
+ int state_handing_over_http_request(int event, void *data);
+ int state_send_socks_reply(int event, void *data);
+
+ int parse_socks_client_request(unsigned char *p);
+ int setupHttpRequest(unsigned char *p);
int sendResp(bool granted);
void init(NetVConnection *netVC);
@@ -75,9 +129,11 @@ private:
Event *timeout = nullptr;
SocksAuthHandler auth_handler = nullptr;
+ SocksProxyHandler vc_handler = nullptr;
Action *pending_action = nullptr;
unsigned char version = 0;
+ int port = 0;
int state = SOCKS_INIT;
int recursion = 0;
};
@@ -93,9 +149,9 @@ SocksProxy::init(NetVConnection *netVC)
SCOPED_MUTEX_LOCK(lock, mutex, this_ethread());
- SET_HANDLER((EventHandler)&SocksProxy::mainEvent);
+ SET_HANDLER((EventHandler)&SocksProxy::acceptEvent);
- mainEvent(NET_EVENT_ACCEPT, netVC);
+ handleEvent(NET_EVENT_ACCEPT, netVC);
}
void
@@ -111,215 +167,44 @@ SocksProxy::free()
}
int
-SocksProxy::mainEvent(int event, void *data)
+SocksProxy::acceptEvent(int event, void *data)
{
- int ret = EVENT_DONE;
- unsigned char *p;
-
- VIO *vio;
- int64_t n_read_avail;
-
- recursion++;
-
- switch (event) {
- case NET_EVENT_ACCEPT:
- state = SOCKS_ACCEPT;
- Debug("SocksProxy", "Proxy got accept event");
-
- clientVC = (NetVConnection *)data;
- clientVC->socks_addr.reset();
- // fallthrough
+ ink_assert(event == NET_EVENT_ACCEPT);
+ state = SOCKS_ACCEPT;
+ Debug("SocksProxy", "Proxy got accept event");
- case VC_EVENT_WRITE_COMPLETE:
-
- switch (state) {
- case HTTP_REQ: {
- HttpSessionAccept::Options ha_opt;
- // This is a WRITE_COMPLETE. vio->nbytes == vio->ndone is true
+ clientVC = (NetVConnection *)data;
+ clientVC->socks_addr.reset();
- SOCKSPROXY_INC_STAT(socksproxy_http_connections_stat);
- Debug("SocksProxy", "Handing over the HTTP request");
+ buf->reset();
- ha_opt.transport_type = clientVC->attributes;
- HttpSessionAccept http_accept(ha_opt);
- http_accept.mainEvent(NET_EVENT_ACCEPT, clientVC);
- state = ALL_DONE;
- break;
- }
+ SET_HANDLER((EventHandler)&SocksProxy::mainEvent);
+ vc_handler = &SocksProxy::state_read_client_request;
- case RESP_TO_CLIENT:
- state = SOCKS_ERROR;
- break;
+ timeout = this_ethread()->schedule_in(this, HRTIME_SECONDS(netProcessor.socks_conf_stuff->socks_timeout));
+ clientVIO = clientVC->do_io_read(this, INT64_MAX, buf);
- default:
- buf->reset();
- timeout = this_ethread()->schedule_in(this, HRTIME_SECONDS(netProcessor.socks_conf_stuff->socks_timeout));
- clientVC->do_io_read(this, INT64_MAX, buf);
- }
+ return EVENT_DONE;
+}
- break;
+int
+SocksProxy::mainEvent(int event, void *data)
+{
+ int ret = EVENT_DONE;
- case VC_EVENT_WRITE_READY:
- Debug("SocksProxy", "Received unexpected write_ready");
- break;
+ recursion++;
+ switch (event) {
+ case VC_EVENT_READ_READY:
case VC_EVENT_READ_COMPLETE:
- Debug("SocksProxy", "Oops! We should never get Read_Complete.");
- // FALLTHROUGH
- case VC_EVENT_READ_READY: {
- unsigned char *port_ptr = nullptr;
-
- ret = EVENT_CONT;
- vio = (VIO *)data;
-
- n_read_avail = reader->block_read_avail();
- ink_assert(n_read_avail == reader->read_avail());
- p = (unsigned char *)reader->start();
-
- if (n_read_avail >= 2) {
- Debug(state == SOCKS_ACCEPT ? "SocksProxy" : "", "Accepted connection from a version %d client", (int)p[0]);
-
- // Most of the time request is just a single packet
- switch (p[0]) {
- case SOCKS4_VERSION:
- ink_assert(state == SOCKS_ACCEPT);
-
- if (n_read_avail > 8) {
- // read the user name
- int i = 8;
- while (p[i] != 0 && n_read_avail > i) {
- i++;
- }
-
- if (p[i] == 0) {
- port_ptr = &p[2];
- clientVC->socks_addr.type = SOCKS_ATYPE_IPV4;
- reader->consume(i + 1);
- ret = EVENT_DONE;
- }
- }
- break;
-
- case SOCKS5_VERSION:
-
- if (state == SOCKS_ACCEPT) {
- if (n_read_avail >= 2 + p[1]) {
- auth_handler = &socks5ServerAuthHandler;
- ret = EVENT_DONE;
- }
- } else {
- ink_assert(state == AUTH_DONE);
-
- if (n_read_avail >= 5) {
- int req_len;
-
- switch (p[3]) {
- case SOCKS_ATYPE_IPV4:
- req_len = 10;
- break;
- case SOCKS_ATYPE_FQHN:
- req_len = 7 + p[4];
- break;
- case SOCKS_ATYPE_IPV6:
- req_len = 22;
- break;
- default:
- req_len = INT_MAX;
- Debug("SocksProxy", "Illegal address type(%d)", (int)p[3]);
- }
-
- if (n_read_avail >= req_len) {
- port_ptr = &p[req_len - 2];
- clientVC->socks_addr.type = p[3];
- auth_handler = nullptr;
- reader->consume(req_len);
- ret = EVENT_DONE;
- }
- }
- }
- break;
-
- default:
- Warning("Wrong version for Socks: %d\n", p[0]);
- state = SOCKS_ERROR;
- }
+ case VC_EVENT_WRITE_READY:
+ case VC_EVENT_WRITE_COMPLETE:
+ if (vc_handler) {
+ ret = (this->*vc_handler)(event, data);
+ } else {
+ Debug("SocksProxy", "Ignore event = %s state = %d", get_vc_event_name(event), state);
}
-
- if (ret == EVENT_DONE) {
- timeout->cancel(this);
- timeout = nullptr;
-
- if (auth_handler) {
- /* disable further reads */
- vio->nbytes = vio->ndone;
-
- // There is some auth stuff left.
- if (invokeSocksAuthHandler(auth_handler, SOCKS_AUTH_READ_COMPLETE, p) >= 0) {
- buf->reset();
- p = (unsigned char *)buf->start();
-
- int n_bytes = invokeSocksAuthHandler(auth_handler, SOCKS_AUTH_FILL_WRITE_BUF, p);
- ink_assert(n_bytes > 0);
-
- buf->fill(n_bytes);
-
- clientVC->do_io_write(this, n_bytes, reader, false);
-
- state = AUTH_DONE;
- } else {
- Debug("SocksProxy", "Auth_handler returned error");
- state = SOCKS_ERROR;
- }
-
- } else {
- int port = port_ptr[0] * 256 + port_ptr[1];
- version = p[0];
-
- if (port == netProcessor.socks_conf_stuff->http_port && p[1] == SOCKS_CONNECT) {
- /* disable further reads */
- vio->nbytes = vio->ndone;
-
- ret = setupHttpRequest(p);
- sendResp(true);
- state = HTTP_REQ;
-
- } else {
- SOCKSPROXY_INC_STAT(socksproxy_tunneled_connections_stat);
- Debug("SocksProxy", "Tunnelling the connection for port %d", port);
-
- if (clientVC->socks_addr.type != SOCKS_ATYPE_IPV4) {
- // We dont support other kinds of addresses for tunnelling
- // if this is a hostname we could do host look up here
- mainEvent(NET_EVENT_OPEN_FAILED, nullptr);
- break;
- }
-
- uint32_t ip;
- struct sockaddr_in addr;
-
- memcpy(&ip, &p[4], 4);
- ats_ip4_set(&addr, ip, htons(port));
-
- state = SERVER_TUNNEL;
- clientVIO = vio; // used in the tunnel
-
- // tunnel the connection.
-
- NetVCOptions vc_options;
- vc_options.socks_support = p[1];
- vc_options.socks_version = version;
-
- Action *action = netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &vc_options);
- if (action != ACTION_RESULT_DONE) {
- ink_assert(pending_action == nullptr);
- pending_action = action;
- }
- }
- }
- } // if (ret == EVENT_DONE)
-
break;
- }
case NET_EVENT_OPEN: {
pending_action = nullptr;
@@ -344,6 +229,7 @@ SocksProxy::mainEvent(int event, void *data)
case NET_EVENT_OPEN_FAILED:
pending_action = nullptr;
+ vc_handler = &SocksProxy::state_send_socks_reply;
sendResp(false);
state = RESP_TO_CLIENT;
Debug("SocksProxy", "open to Socks server failed");
@@ -355,10 +241,10 @@ SocksProxy::mainEvent(int event, void *data)
state = SOCKS_ERROR;
break;
+ case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
- case VC_EVENT_EOS:
Debug("SocksProxy", "VC_EVENT (state: %d error: %s)", state, get_vc_event_name(event));
state = SOCKS_ERROR;
break;
@@ -368,13 +254,17 @@ SocksProxy::mainEvent(int event, void *data)
state = SOCKS_ERROR;
}
+ recursion--;
+
if (state == SOCKS_ERROR) {
if (pending_action) {
pending_action->cancel();
+ pending_action = nullptr;
}
if (timeout) {
timeout->cancel(this);
+ timeout = nullptr;
}
if (clientVC) {
@@ -386,8 +276,6 @@ SocksProxy::mainEvent(int event, void *data)
state = ALL_DONE;
}
- recursion--;
-
if (state == ALL_DONE && recursion == 0) {
free();
}
@@ -396,6 +284,319 @@ SocksProxy::mainEvent(int event, void *data)
}
int
+SocksProxy::state_read_client_request(int event, void *data)
+{
+ ink_assert(state == SOCKS_ACCEPT);
+ if (event != VC_EVENT_READ_READY) {
+ ink_assert(!"not reached");
+ return EVENT_CONT;
+ }
+
+ int64_t n = reader->block_read_avail();
+ if (n < 2) {
+ return EVENT_CONT;
+ }
+
+ unsigned char *p = (unsigned char *)reader->start();
+
+ Debug("SocksProxy", "Accepted connection from a version %d client", (int)p[0]);
+
+ switch (p[0]) {
+ case SOCKS4_VERSION:
+ version = p[0];
+ vc_handler = &SocksProxy::state_read_socks4_client_request;
+ return (this->*vc_handler)(event, data);
+ break;
+ case SOCKS5_VERSION:
+ version = p[0];
+ vc_handler = &SocksProxy::state_read_socks5_client_auth_methods;
+ return (this->*vc_handler)(event, data);
+ break;
+ default:
+ Warning("Wrong version for Socks: %d\n", (int)p[0]);
+ state = SOCKS_ERROR;
+ break;
+ }
+
+ return EVENT_DONE;
+}
+
+int
+SocksProxy::state_read_socks4_client_request(int event, void *data)
+{
+ ink_assert(state == SOCKS_ACCEPT);
+
+ int64_t n = reader->block_read_avail();
+ /* Socks v4 request:
+ * VN CD DSTPORT DSTIP USERID NUL
+ * 1 + 1 + 2 + 4 + ? + 1
+ *
+ * so the minimum length is 9 bytes
+ */
+ if (n < 9) {
+ return EVENT_CONT;
+ }
+
+ unsigned char *p = (unsigned char *)reader->start();
+ int i;
+ // Skip UserID
+ for (i = 8; i < n && p[i] != 0; i++)
+ ;
+
+ if (p[i] == 0) {
+ port = p[2] * 256 + p[3];
+ clientVC->socks_addr.type = SOCKS_ATYPE_IPV4;
+ reader->consume(i + 1);
+ state = AUTH_DONE;
+
+ return parse_socks_client_request(p);
+ } else {
+ Debug("SocksProxy", "Need more data to parse userid for Socks: %d\n", p[0]);
+ return EVENT_CONT;
+ }
+}
+
+int
+SocksProxy::state_read_socks5_client_auth_methods(int event, void *data)
+{
+ int64_t n;
+ unsigned char *p;
+
+ ink_assert(state == SOCKS_ACCEPT);
+
+ n = reader->block_read_avail();
+ p = (unsigned char *)reader->start();
+
+ /* Socks v5 request:
+ * VER N_Methods List_of Methods
+ * 1 + 1 + (1 to N_Methods)
+ *
+ * so the minimum length is 2 + N_Methods bytes
+ */
+ if (n < 2 + p[1]) {
+ return EVENT_CONT;
+ }
+
+ if (timeout) {
+ timeout->cancel(this);
+ timeout = nullptr;
+ }
+
+ auth_handler = &socks5ServerAuthHandler;
+ /* disable further reads */
+ clientVIO->nbytes = clientVIO->ndone;
+
+ // There is some auth stuff left.
+ if (invokeSocksAuthHandler(auth_handler, SOCKS_AUTH_READ_COMPLETE, p) >= 0) {
+ buf->reset();
+ p = (unsigned char *)buf->start();
+
+ int n_bytes = invokeSocksAuthHandler(auth_handler, SOCKS_AUTH_FILL_WRITE_BUF, p);
+ ink_assert(n_bytes > 0);
+
+ buf->fill(n_bytes);
+
+ vc_handler = &SocksProxy::state_send_socks5_auth_method;
+ clientVC->do_io_write(this, n_bytes, reader, false);
+ } else {
+ Debug("SocksProxy", "Auth_handler returned error\n");
+ state = SOCKS_ERROR;
+ }
+
+ return EVENT_DONE;
+}
+
+int
+SocksProxy::state_send_socks5_auth_method(int event, void *data)
+{
+ ink_assert(state == SOCKS_ACCEPT);
+ switch (event) {
+ case VC_EVENT_WRITE_COMPLETE:
+ state = AUTH_DONE;
+
+ buf->reset();
+ timeout = this_ethread()->schedule_in(this, HRTIME_SECONDS(netProcessor.socks_conf_stuff->socks_timeout));
+
+ // We always send "No authentication is required" to client,
+ // so the next is socks5 request.
+ vc_handler = &SocksProxy::state_read_socks5_client_request;
+ clientVC->do_io_read(this, INT64_MAX, buf);
+ break;
+ case VC_EVENT_WRITE_READY:
+ default:
+ Debug("SocksProxy", "Received unexpected event: %s\n", get_vc_event_name(event));
+ break;
+ }
+
+ return EVENT_DONE;
+}
+
+int
+SocksProxy::state_read_socks5_client_request(int event, void *data)
+{
+ int64_t n;
+ unsigned char *p;
+
+ ink_assert(state == AUTH_DONE);
+ if (event != VC_EVENT_READ_READY) {
+ ink_assert(!"not reached");
+ return EVENT_CONT;
+ }
+
+ n = reader->block_read_avail();
+ p = (unsigned char *)reader->start();
+
+ /* Socks v5 request:
+ * VER CMD RSV ATYP DST DSTPORT
+ * 1 + 1 + 1 + 1 + ? + 2
+ *
+ * so the minimum length is 6 + 4(IPv4) or 16(IPv6)
+ */
+ if (n <= 6) {
+ return EVENT_CONT;
+ }
+ int req_len;
+ switch (p[3]) {
+ case SOCKS_ATYPE_IPV4:
+ req_len = 10;
+ break;
+ case SOCKS_ATYPE_FQHN:
+ req_len = 7 + p[4];
+ break;
+ case SOCKS_ATYPE_IPV6:
+ req_len = 22;
+ break;
+ default:
+ req_len = INT_MAX;
+ state = SOCKS_ERROR;
+ Debug("SocksProxy", "Illegal address type(%d)", (int)p[3]);
+ }
+
+ if (state == SOCKS_ERROR) {
+ return EVENT_DONE;
+ } else if (n < req_len) {
+ return EVENT_CONT;
+ }
+
+ port = p[req_len - 2] * 256 + p[req_len - 1];
+ clientVC->socks_addr.type = p[3];
+ auth_handler = nullptr;
+ reader->consume(req_len);
+
+ return parse_socks_client_request(p);
+}
+
+int
+SocksProxy::parse_socks_client_request(unsigned char *p)
+{
+ int ret = EVENT_DONE;
+
+ if (timeout) {
+ timeout->cancel(this);
+ timeout = nullptr;
+ }
+
+ if (port == netProcessor.socks_conf_stuff->http_port && p[1] == SOCKS_CONNECT) {
+ /* disable further reads */
+ clientVIO->nbytes = clientVIO->ndone;
+
+ ret = setupHttpRequest(p);
+ vc_handler = &SocksProxy::state_handing_over_http_request;
+ sendResp(true);
+ state = HTTP_REQ;
+ } else {
+ SOCKSPROXY_INC_STAT(socksproxy_tunneled_connections_stat);
+ Debug("SocksProxy", "Tunnelling the connection for port %d", port);
+
+ if (clientVC->socks_addr.type != SOCKS_ATYPE_IPV4) {
+ // We dont support other kinds of addresses for tunnelling
+ // if this is a hostname we could do host look up here
+ ret = mainEvent(NET_EVENT_OPEN_FAILED, nullptr);
+ } else {
+ uint32_t ip;
+ struct sockaddr_in addr;
+
+ memcpy(&ip, &p[4], 4);
+ ats_ip4_set(&addr, ip, htons(port));
+
+ // Ignore further reads
+ vc_handler = nullptr;
+
+ state = SERVER_TUNNEL;
+
+ // tunnel the connection.
+
+ NetVCOptions vc_options;
+ vc_options.socks_support = p[1];
+ vc_options.socks_version = version;
+
+ Action *action = netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &vc_options);
+ if (action != ACTION_RESULT_DONE) {
+ ink_release_assert(pending_action == nullptr);
+ pending_action = action;
+ }
+ }
+ }
+
+ return ret;
+}
+
+int
+SocksProxy::state_handing_over_http_request(int event, void *data)
+{
+ int ret = EVENT_DONE;
+
+ ink_assert(state == HTTP_REQ);
+
+ switch (event) {
+ case VC_EVENT_WRITE_COMPLETE: {
+ HttpSessionAccept::Options ha_opt;
+
+ SOCKSPROXY_INC_STAT(socksproxy_http_connections_stat);
+ Debug("SocksProxy", "Handing over the HTTP request");
+
+ ha_opt.transport_type = clientVC->attributes;
+ HttpSessionAccept http_accept(ha_opt);
+ if (!http_accept.accept(clientVC, buf, reader)) {
+ state = SOCKS_ERROR;
+ } else {
+ state = ALL_DONE;
+ buf = nullptr; // do not free buf. HttpSM will do that.
+ clientVC = nullptr;
+ vc_handler = nullptr;
+ }
+ break;
+ }
+ case VC_EVENT_WRITE_READY:
+ Debug("SocksProxy", "Received unexpected write_ready");
+ ret = EVENT_CONT;
+ break;
+ }
+
+ return ret;
+}
+
+int
+SocksProxy::state_send_socks_reply(int event, void *data)
+{
+ int ret = EVENT_DONE;
+
+ ink_assert(state == RESP_TO_CLIENT);
+
+ switch (event) {
+ case VC_EVENT_WRITE_COMPLETE:
+ state = SOCKS_ERROR;
+ break;
+ case VC_EVENT_WRITE_READY:
+ Debug("SocksProxy", "Received unexpected write_ready");
+ ret = EVENT_CONT;
+ break;
+ }
+
+ return ret;
+}
+
+int
SocksProxy::sendResp(bool granted)
{
int n_bytes;