You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by sh...@apache.org on 2016/04/16 16:31:50 UTC
[trafficserver] branch master updated: TS-3612: Restructure client
session and transaction processing. This closes #570.
This is an automated email from the ASF dual-hosted git repository.
shinrich pushed a commit to branch master
in repository https://git-dual.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new af76977 TS-3612: Restructure client session and transaction processing. This closes #570.
af76977 is described below
commit af76977adb9f3c0296a232688bbcb5a1421a6768
Author: Susan Hinrichs <sh...@draggingnagging.corp.ne1.yahoo.com>
AuthorDate: Wed Apr 13 19:57:39 2016 +0000
TS-3612: Restructure client session and transaction processing. This closes #570.
---
iocore/eventsystem/I_Thread.h | 4 +-
iocore/net/P_SSLNetVConnection.h | 20 -
iocore/net/SSLNetVConnection.cc | 31 +-
iocore/net/UnixNetProcessor.cc | 2 +-
proxy/InkAPI.cc | 67 ++-
proxy/Makefile.am | 4 +
proxy/ProtocolProbeSessionAccept.cc | 16 +-
proxy/ProxyClientSession.cc | 3 +-
proxy/ProxyClientSession.h | 79 ++-
proxy/ProxyClientTransaction.cc | 69 +++
proxy/ProxyClientTransaction.h | 238 +++++++++
...{HttpClientSession.cc => Http1ClientSession.cc} | 298 +++++------
.../{HttpClientSession.h => Http1ClientSession.h} | 117 +++--
proxy/http/Http1ClientTransaction.cc | 67 +++
proxy/http/Http1ClientTransaction.h | 181 +++++++
proxy/http/HttpProxyServerMain.cc | 6 +-
proxy/http/HttpSM.cc | 144 ++++--
proxy/http/HttpSM.h | 14 +-
proxy/http/HttpSessionAccept.cc | 4 +-
proxy/http/HttpSessionManager.cc | 4 +-
proxy/http/HttpSessionManager.h | 4 +-
proxy/http/HttpTransact.cc | 25 +-
proxy/http/HttpTunnel.cc | 74 +--
proxy/http/HttpTunnel.h | 18 +-
proxy/http/Makefile.am | 6 +-
proxy/http2/HTTP2.cc | 18 +
proxy/http2/HTTP2.h | 6 +
proxy/http2/Http2ClientSession.cc | 50 +-
proxy/http2/Http2ClientSession.h | 32 +-
proxy/http2/Http2ConnectionState.cc | 141 +++--
proxy/http2/Http2ConnectionState.h | 5 +-
proxy/http2/Http2SessionAccept.cc | 14 +-
proxy/http2/Http2Stream.cc | 574 +++++++++++++++++++--
proxy/http2/Http2Stream.h | 161 ++++--
proxy/spdy/SpdyCallbacks.cc | 1 +
proxy/spdy/SpdyClientSession.h | 11 +
36 files changed, 1903 insertions(+), 605 deletions(-)
diff --git a/iocore/eventsystem/I_Thread.h b/iocore/eventsystem/I_Thread.h
index abc1726..74b4826 100644
--- a/iocore/eventsystem/I_Thread.h
+++ b/iocore/eventsystem/I_Thread.h
@@ -131,7 +131,9 @@ public:
ProxyAllocator eventAllocator;
ProxyAllocator netVCAllocator;
ProxyAllocator sslNetVCAllocator;
- ProxyAllocator httpClientSessionAllocator;
+ ProxyAllocator http1ClientSessionAllocator;
+ ProxyAllocator http2ClientSessionAllocator;
+ ProxyAllocator http2StreamAllocator;
ProxyAllocator httpServerSessionAllocator;
ProxyAllocator hdrHeapAllocator;
ProxyAllocator strHeapAllocator;
diff --git a/iocore/net/P_SSLNetVConnection.h b/iocore/net/P_SSLNetVConnection.h
index 59d492c..b7882b0 100644
--- a/iocore/net/P_SSLNetVConnection.h
+++ b/iocore/net/P_SSLNetVConnection.h
@@ -229,24 +229,6 @@ public:
// least some of the hooks
bool calledHooks(TSHttpHookID /* eventId */) { return (this->sslHandshakeHookState != HANDSHAKE_HOOKS_PRE); }
- MIOBuffer *
- get_ssl_iobuf()
- {
- return iobuf;
- }
-
- void
- set_ssl_iobuf(MIOBuffer *buf)
- {
- iobuf = buf;
- }
-
- IOBufferReader *
- get_ssl_reader()
- {
- return reader;
- }
-
bool
isEosRcvd()
{
@@ -328,8 +310,6 @@ private:
const SSLNextProtocolSet *npnSet;
Continuation *npnEndpoint;
SessionAccept *sessionAcceptPtr;
- MIOBuffer *iobuf;
- IOBufferReader *reader;
bool eosRcvd;
bool sslTrace;
};
diff --git a/iocore/net/SSLNetVConnection.cc b/iocore/net/SSLNetVConnection.cc
index 9373405..46a27c2 100644
--- a/iocore/net/SSLNetVConnection.cc
+++ b/iocore/net/SSLNetVConnection.cc
@@ -531,30 +531,6 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
// the handshake is complete. Otherwise set up for continuing read
// operations.
if (ntodo <= 0) {
- if (!getSSLClientConnection()) {
- // we will not see another ET epoll event if the first byte is already
- // in the ssl buffers, so, SSL_read if there's anything already..
- Debug("ssl", "ssl handshake completed on vc %p, check to see if first byte, is already in the ssl buffers", this);
- this->iobuf = new_MIOBuffer(BUFFER_SIZE_INDEX_4K);
- if (this->iobuf) {
- this->reader = this->iobuf->alloc_reader();
- s->vio.buffer.writer_for(this->iobuf);
- ret = ssl_read_from_net(this, lthread, r);
- if (ret == SSL_READ_EOS) {
- this->eosRcvd = true;
- }
-#if DEBUG
- int pending = SSL_pending(this->ssl);
- if (r > 0 || pending > 0) {
- Debug("ssl", "ssl read right after handshake, read %" PRId64 ", pending %d bytes, for vc %p", r, pending, this);
- }
-#endif
- } else {
- Error("failed to allocate MIOBuffer after handshake, vc %p", this);
- }
- read.triggered = 0;
- read_disable(nh, this);
- }
readSignalDone(VC_EVENT_READ_COMPLETE, nh);
} else {
read.triggered = 1;
@@ -857,7 +833,7 @@ SSLNetVConnection::SSLNetVConnection()
sslHandShakeComplete(false), sslClientConnection(false), sslClientRenegotiationAbort(false), sslSessionCacheHit(false),
handShakeBuffer(NULL), handShakeHolder(NULL), handShakeReader(NULL), handShakeBioStored(0),
sslPreAcceptHookState(SSL_HOOKS_INIT), sslHandshakeHookState(HANDSHAKE_HOOKS_PRE), npnSet(NULL), npnEndpoint(NULL),
- sessionAcceptPtr(NULL), iobuf(NULL), reader(NULL), eosRcvd(false), sslTrace(false)
+ sessionAcceptPtr(NULL), eosRcvd(false), sslTrace(false)
{
}
@@ -931,9 +907,6 @@ SSLNetVConnection::free(EThread *t)
SSL_free(ssl);
ssl = NULL;
}
- if (iobuf) {
- free_MIOBuffer(iobuf);
- }
sslHandShakeComplete = false;
sslClientConnection = false;
sslHandshakeBeginTime = 0;
@@ -950,8 +923,6 @@ SSLNetVConnection::free(EThread *t)
npnSet = NULL;
npnEndpoint = NULL;
sessionAcceptPtr = NULL;
- iobuf = NULL;
- reader = NULL;
eosRcvd = false;
sslHandShakeComplete = false;
free_handshake_buffers();
diff --git a/iocore/net/UnixNetProcessor.cc b/iocore/net/UnixNetProcessor.cc
index 89bb63a..37e210e 100644
--- a/iocore/net/UnixNetProcessor.cc
+++ b/iocore/net/UnixNetProcessor.cc
@@ -264,7 +264,7 @@ UnixNetProcessor::connect_re_internal(Continuation *cont, sockaddr const *target
}
}
}
- eventProcessor.schedule_imm(vc, opt->etype);
+ t->schedule_imm(vc);
if (using_socks) {
return &socksEntry->action_;
} else
diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
index eea2460..ee1adb9 100644
--- a/proxy/InkAPI.cc
+++ b/proxy/InkAPI.cc
@@ -33,7 +33,7 @@
#include "URL.h"
#include "MIME.h"
#include "HTTP.h"
-#include "HttpClientSession.h"
+#include "ProxyClientSession.h"
#include "Http2ClientSession.h"
#include "HttpServerSession.h"
#include "HttpSM.h"
@@ -961,24 +961,21 @@ INKContInternal::destroy()
} else {
// TODO: Should this schedule on some other "thread" ?
// TODO: we don't care about the return action?
- TSContSchedule((TSCont) this, 0, TS_THREAD_POOL_DEFAULT);
+ this_ethread()->schedule_imm(this);
+ Warning("INKCont not deletable %d %p", m_event_count, this);
}
}
void
INKContInternal::handle_event_count(int event)
{
- if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
- int val;
-
- m_deletable = (m_closed != 0);
-
- val = ink_atomic_increment((int *)&m_event_count, -1);
+ if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL) || event == TS_EVENT_HTTP_TXN_CLOSE) {
+ int val = ink_atomic_increment((int *)&m_event_count, -1);
if (val <= 0) {
ink_assert(!"not reached");
}
- m_deletable = m_deletable && (val == 1);
+ m_deletable = (m_closed != 0) && (val == 1);
}
}
@@ -994,6 +991,8 @@ INKContInternal::handle_event(int event, void *edata)
this->mutex = NULL;
m_free_magic = INKCONT_INTERN_MAGIC_DEAD;
INKContAllocator.free(this);
+ } else {
+ Warning("INKCont Deletable but not deleted %d", m_event_count);
}
} else {
return m_event_func((TSCont) this, (TSEvent)event, edata);
@@ -1209,7 +1208,7 @@ INKVConnInternal::set_data(int id, void *data)
int
APIHook::invoke(int event, void *edata)
{
- if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
+ if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL) || event == TS_EVENT_HTTP_TXN_CLOSE) {
if (ink_atomic_increment((int *)&m_cont->m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
@@ -4404,7 +4403,7 @@ TSHttpSsnHookAdd(TSHttpSsn ssnp, TSHttpHookID id, TSCont contp)
sdk_assert(sdk_sanity_check_continuation(contp) == TS_SUCCESS);
sdk_assert(sdk_sanity_check_hook_id(id) == TS_SUCCESS);
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
cs->ssn_hook_append(id, (INKContInternal *)contp);
}
@@ -4413,14 +4412,14 @@ TSHttpSsnTransactionCount(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
return cs->get_transact_count();
}
class TSHttpSsnCallback : public Continuation
{
public:
- TSHttpSsnCallback(HttpClientSession *cs, TSEvent event) : Continuation(cs->mutex), m_cs(cs), m_event(event)
+ TSHttpSsnCallback(ProxyClientSession *cs, TSEvent event) : Continuation(cs->mutex), m_cs(cs), m_event(event)
{
SET_HANDLER(&TSHttpSsnCallback::event_handler);
}
@@ -4434,7 +4433,7 @@ public:
}
private:
- HttpClientSession *m_cs;
+ ProxyClientSession *m_cs;
TSEvent m_event;
};
@@ -4444,7 +4443,7 @@ TSHttpSsnReenable(TSHttpSsn ssnp, TSEvent event)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
EThread *eth = this_ethread();
// If this function is being executed on a thread created by the API
@@ -4499,8 +4498,8 @@ TSHttpTxnSsnGet(TSHttpTxn txnp)
{
sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS);
- HttpSM *sm = (HttpSM *)txnp;
- return (TSHttpSsn)sm->ua_session;
+ HttpSM *sm = reinterpret_cast<HttpSM *>(txnp);
+ return reinterpret_cast<TSHttpSsn>(sm->ua_session ? (TSHttpSsn)sm->ua_session->get_parent() : NULL);
}
// TODO: Is this still necessary ??
@@ -4510,7 +4509,7 @@ TSHttpTxnClientKeepaliveSet(TSHttpTxn txnp, int set)
HttpSM *sm = (HttpSM *)txnp;
HttpTransact::State *s = &(sm->t_state);
- s->hdr_info.trust_response_cl = (set != 0) ? true : false;
+ s->hdr_info.trust_response_cl = (set != 0);
}
TSReturnCode
@@ -5240,7 +5239,7 @@ TSHttpSsnSSLConnectionGet(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_null_ptr((void *)ssnp) == TS_SUCCESS);
- HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL) {
return NULL;
}
@@ -5256,7 +5255,7 @@ TSHttpSsnSSLConnectionGet(TSHttpSsn ssnp)
sockaddr const *
TSHttpSsnClientAddrGet(TSHttpSsn ssnp)
{
- HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL)
return 0;
@@ -5279,7 +5278,7 @@ TSHttpTxnClientAddrGet(TSHttpTxn txnp)
sockaddr const *
TSHttpSsnIncomingAddrGet(TSHttpSsn ssnp)
{
- HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL)
return 0;
@@ -5359,16 +5358,8 @@ TSHttpTxnOutgoingAddrSet(TSHttpTxn txnp, const struct sockaddr *addr)
sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS);
HttpSM *sm = (HttpSM *)txnp;
- sm->ua_session->outbound_port = ats_ip_port_host_order(addr);
-
- if (ats_is_ip4(addr)) {
- sm->ua_session->outbound_ip4.assign(addr);
- } else if (ats_is_ip6(addr)) {
- sm->ua_session->outbound_ip6.assign(addr);
- } else {
- sm->ua_session->outbound_ip4.invalidate();
- sm->ua_session->outbound_ip6.invalidate();
- }
+ sm->ua_session->set_outbound_port(ats_ip_port_host_order(addr));
+ sm->ua_session->set_outbound_ip(IpAddr(addr));
return TS_ERROR;
}
@@ -5399,7 +5390,7 @@ TSHttpTxnOutgoingTransparencySet(TSHttpTxn txnp, int flag)
return TS_ERROR;
}
- sm->ua_session->f_outbound_transparent = flag;
+ sm->ua_session->set_outbound_transparent(flag);
return TS_SUCCESS;
}
@@ -5745,7 +5736,7 @@ TSHttpSsnArgSet(TSHttpSsn ssnp, int arg_idx, void *arg)
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
sdk_assert(arg_idx >= 0 && arg_idx < HTTP_SSN_TXN_MAX_USER_ARG);
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
cs->set_user_arg(arg_idx, arg);
}
@@ -5756,7 +5747,7 @@ TSHttpSsnArgGet(TSHttpSsn ssnp, int arg_idx)
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
sdk_assert(arg_idx >= 0 && arg_idx < HTTP_SSN_TXN_MAX_USER_ARG);
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
return cs->get_user_arg(arg_idx);
}
@@ -5861,14 +5852,14 @@ void
TSHttpSsnDebugSet(TSHttpSsn ssnp, int on)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
- ((HttpClientSession *)ssnp)->debug_on = on;
+ (reinterpret_cast<ProxyClientSession *>(ssnp))->debug_on = on;
}
int
TSHttpSsnDebugGet(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
- return ((HttpClientSession *)ssnp)->debug();
+ return (reinterpret_cast<ProxyClientSession *>(ssnp))->debug();
}
int
@@ -7494,7 +7485,7 @@ TSFetchRespHdrMLocGet(TSFetchSM fetch_sm)
TSReturnCode
TSHttpIsInternalSession(TSHttpSsn ssnp)
{
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (!cs) {
return TS_ERROR;
}
@@ -7510,7 +7501,7 @@ TSHttpIsInternalSession(TSHttpSsn ssnp)
TSReturnCode
TSHttpSsnIsInternal(TSHttpSsn ssnp)
{
- HttpClientSession *cs = (HttpClientSession *)ssnp;
+ ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (!cs) {
return TS_ERROR;
}
diff --git a/proxy/Makefile.am b/proxy/Makefile.am
index cbd1da4..75458f6 100644
--- a/proxy/Makefile.am
+++ b/proxy/Makefile.am
@@ -176,6 +176,8 @@ traffic_server_SOURCES = \
ProtocolProbeSessionAccept.h \
ProxyClientSession.cc \
ProxyClientSession.h \
+ ProxyClientTransaction.cc \
+ ProxyClientTransaction.h \
ReverseProxy.cc \
ReverseProxy.h \
SocksProxy.cc \
@@ -294,6 +296,8 @@ traffic_sac_SOURCES = \
ProtocolProbeSessionAccept.h \
ProxyClientSession.cc \
ProxyClientSession.h \
+ ProxyClientTransaction.cc \
+ ProxyClientTransaction.h \
Plugin.cc \
InkAPI.cc \
FetchSM.cc \
diff --git a/proxy/ProtocolProbeSessionAccept.cc b/proxy/ProtocolProbeSessionAccept.cc
index 5b8a7a8..f73a051 100644
--- a/proxy/ProtocolProbeSessionAccept.cc
+++ b/proxy/ProtocolProbeSessionAccept.cc
@@ -125,10 +125,7 @@ struct ProtocolProbeTrampoline : public Continuation, public ProtocolProbeSessio
return EVENT_CONT;
done:
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
- if (!ssl_vc || (this->iobuf != ssl_vc->get_ssl_iobuf())) {
- free_MIOBuffer(this->iobuf);
- }
+ free_MIOBuffer(this->iobuf);
this->iobuf = NULL;
delete this;
return EVENT_CONT;
@@ -145,15 +142,8 @@ ProtocolProbeSessionAccept::mainEvent(int event, void *data)
ink_assert(data);
VIO *vio;
- NetVConnection *netvc = static_cast<NetVConnection *>(data);
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
- MIOBuffer *buf = NULL;
- IOBufferReader *reader = NULL;
- if (ssl_vc) {
- buf = ssl_vc->get_ssl_iobuf();
- reader = ssl_vc->get_ssl_reader();
- }
- ProtocolProbeTrampoline *probe = new ProtocolProbeTrampoline(this, netvc->mutex, buf, reader);
+ NetVConnection *netvc = (NetVConnection *)data;
+ ProtocolProbeTrampoline *probe = new ProtocolProbeTrampoline(this, netvc->mutex, NULL, NULL);
// XXX we need to apply accept inactivity timeout here ...
diff --git a/proxy/ProxyClientSession.cc b/proxy/ProxyClientSession.cc
index b35f493..57bd5ff 100644
--- a/proxy/ProxyClientSession.cc
+++ b/proxy/ProxyClientSession.cc
@@ -27,7 +27,8 @@
static int64_t next_cs_id = 0;
-ProxyClientSession::ProxyClientSession() : VConnection(NULL), debug_on(false), hooks_on(true)
+ProxyClientSession::ProxyClientSession()
+ : VConnection(NULL), acl_record(NULL), host_res_style(HOST_RES_IPV4), debug_on(false), hooks_on(true), con_id(0)
{
ink_zero(this->user_args);
}
diff --git a/proxy/ProxyClientSession.h b/proxy/ProxyClientSession.h
index ca02d19..2deae04 100644
--- a/proxy/ProxyClientSession.h
+++ b/proxy/ProxyClientSession.h
@@ -25,14 +25,19 @@
#define __PROXY_CLIENT_SESSION_H__
#include "ts/ink_platform.h"
+#include "ts/ink_resolver.h"
#include "P_Net.h"
#include "InkAPIInternal.h"
+#include "http/HttpServerSession.h"
// Emit a debug message conditional on whether this particular client session
// has debugging enabled. This should only be called from within a client session
// member function.
#define DebugSsn(ssn, tag, ...) DebugSpecific((ssn)->debug(), tag, __VA_ARGS__)
+class ProxyClientTransaction;
+struct AclRecord;
+
class ProxyClientSession : public VConnection
{
public:
@@ -46,7 +51,7 @@ public:
virtual void
ssn_hook_append(TSHttpHookID id, INKContInternal *cont)
{
- this->api_hooks.prepend(id, cont);
+ this->api_hooks.append(id, cont);
}
virtual void
@@ -101,6 +106,75 @@ public:
void do_api_callout(TSHttpHookID id);
static int64_t next_connection_id();
+ virtual int get_transact_count() const = 0;
+
+ // Override if your session protocol allows this
+ virtual bool
+ is_transparent_passthrough_allowed()
+ {
+ return false;
+ }
+
+ // Override if your session protocol cares
+ virtual void
+ set_half_close_flag(bool flag)
+ {
+ }
+ virtual bool
+ get_half_close_flag() const
+ {
+ return false;
+ }
+
+ // Indicate we are done with a transaction
+ virtual void release(ProxyClientTransaction *trans) = 0;
+
+ /// acl record - cache IpAllow::match() call
+ const AclRecord *acl_record;
+
+ int64_t
+ connection_id() const
+ {
+ return con_id;
+ }
+
+ virtual void
+ attach_server_session(HttpServerSession *ssession, bool transaction_done = true)
+ {
+ }
+
+ virtual HttpServerSession *
+ get_server_session() const
+ {
+ return NULL;
+ }
+
+ /// DNS resolution preferences.
+ HostResStyle host_res_style;
+
+ virtual int state_api_callout(int event, void *edata);
+
+ TSHttpHookID
+ get_hookid() const
+ {
+ return api_hookid;
+ }
+
+ ink_hrtime ssn_start_time;
+ ink_hrtime ssn_last_txn_time;
+
+ virtual void
+ set_active_timeout(ink_hrtime timeout_in)
+ {
+ }
+ virtual void
+ set_inactivity_timeout(ink_hrtime timeout_in)
+ {
+ }
+ virtual void
+ cancel_inactivity_timeout()
+ {
+ }
protected:
// XXX Consider using a bitwise flags variable for the following flags, so that we can make the best
@@ -110,6 +184,8 @@ protected:
bool debug_on;
bool hooks_on;
+ int64_t con_id;
+
private:
APIHookScope api_scope;
TSHttpHookID api_hookid;
@@ -120,7 +196,6 @@ private:
ProxyClientSession(ProxyClientSession &); // noncopyable
ProxyClientSession &operator=(const ProxyClientSession &); // noncopyable
- int state_api_callout(int event, void *edata);
void handle_api_return(int event);
friend void TSHttpSsnDebugSet(TSHttpSsn, int);
diff --git a/proxy/ProxyClientTransaction.cc b/proxy/ProxyClientTransaction.cc
new file mode 100644
index 0000000..af72bc8
--- /dev/null
+++ b/proxy/ProxyClientTransaction.cc
@@ -0,0 +1,69 @@
+/** @file
+
+ ProxyClientTransaction - Base class for protocol client transactions.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "http/HttpSM.h"
+#include "http/HttpServerSession.h"
+
+#define DebugHttpTxn(fmt, ...) DebugSsn(this, "http_txn", fmt, __VA_ARGS__)
+
+ProxyClientTransaction::ProxyClientTransaction() : VConnection(NULL), parent(NULL), current_reader(NULL), restart_immediate(false)
+{
+}
+
+void
+ProxyClientTransaction::new_transaction()
+{
+ ink_assert(current_reader == NULL);
+
+ // Defensive programming, make sure nothing persists across
+ // connection re-use
+
+ ink_release_assert(parent != NULL);
+ current_reader = HttpSM::allocate();
+ current_reader->init();
+ DebugHttpTxn("[%" PRId64 "] Starting transaction %d using sm [%" PRId64 "]", parent->connection_id(),
+ parent->get_transact_count(), current_reader->sm_id);
+
+ current_reader->attach_client_session(this, sm_reader);
+}
+
+void
+ProxyClientTransaction::release(IOBufferReader *r)
+{
+ ink_assert(current_reader != NULL);
+
+ DebugHttpTxn("[%" PRId64 "] session released by sm [%" PRId64 "]", parent ? parent->connection_id() : 0,
+ current_reader ? current_reader->sm_id : 0);
+
+ current_reader = NULL; // Clear reference to SM
+
+ // Pass along the release to the session
+ if (parent)
+ parent->release(this);
+}
+
+void
+ProxyClientTransaction::attach_server_session(HttpServerSession *ssession, bool transaction_done)
+{
+ parent->attach_server_session(ssession, transaction_done);
+}
diff --git a/proxy/ProxyClientTransaction.h b/proxy/ProxyClientTransaction.h
new file mode 100644
index 0000000..19800c9
--- /dev/null
+++ b/proxy/ProxyClientTransaction.h
@@ -0,0 +1,238 @@
+/** @file
+
+ ProxyClientTransaction - Base class for protocol client transactions.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef __PROXY_CLIENT_TRANSACTION_H__
+#define __PROXY_CLIENT_TRANSACTION_H__
+
+#include "ProxyClientSession.h"
+
+class HttpSM;
+class HttpServerSession;
+class ProxyClientTransaction : public VConnection
+{
+public:
+ ProxyClientTransaction();
+
+ // do_io methods implemented by subclasses
+
+ virtual void new_transaction();
+
+ virtual NetVConnection *
+ get_netvc() const
+ {
+ return (parent) ? parent->get_netvc() : NULL;
+ }
+
+ virtual void set_active_timeout(ink_hrtime timeout_in) = 0;
+ virtual void set_inactivity_timeout(ink_hrtime timeout_in) = 0;
+ virtual void cancel_inactivity_timeout() = 0;
+
+ virtual void attach_server_session(HttpServerSession *ssession, bool transaction_done = true);
+
+ int
+ get_transact_count() const
+ {
+ return parent ? parent->get_transact_count() : 0;
+ }
+
+ // Ask your session if this is allowed
+ bool
+ is_transparent_passthrough_allowed()
+ {
+ return parent ? parent->is_transparent_passthrough_allowed() : false;
+ }
+
+ void
+ set_half_close_flag(bool flag)
+ {
+ if (parent)
+ parent->set_half_close_flag(flag);
+ }
+ virtual bool
+ get_half_close_flag() const
+ {
+ return parent ? parent->get_half_close_flag() : false;
+ }
+
+ // What are the debug and hooks_enabled used for? How are they set?
+ // Just calling through to parent session for now
+ bool
+ debug() const
+ {
+ return parent ? parent->debug() : false;
+ }
+ bool
+ hooks_enabled() const
+ {
+ return parent ? parent->hooks_enabled() : false;
+ }
+
+ APIHook *
+ ssn_hook_get(TSHttpHookID id) const
+ {
+ return parent ? parent->ssn_hook_get(id) : NULL;
+ }
+
+ bool
+ has_hooks() const
+ {
+ return parent->has_hooks();
+ }
+
+ // for DI. An active connection is one that a request has
+ // been successfully parsed (PARSE_DONE) and it remains to
+ // be active until the transaction goes through or the client
+ // aborts.
+ bool m_active;
+
+ /// DNS resolution preferences.
+ HostResStyle
+ get_host_res_style() const
+ {
+ return host_res_style;
+ }
+ void
+ set_host_res_style(HostResStyle style)
+ {
+ host_res_style = style;
+ }
+
+ const AclRecord *
+ get_acl_record() const
+ {
+ return parent ? parent->acl_record : NULL;
+ }
+
+ // Indicate we are done with this transaction
+ virtual void release(IOBufferReader *r);
+
+ // outbound values Set via the server port definition. Really only used for Http1 at the moment
+ virtual uint16_t
+ get_outbound_port() const
+ {
+ return 0;
+ }
+ virtual IpAddr
+ get_outbound_ip4() const
+ {
+ return IpAddr();
+ }
+ virtual IpAddr
+ get_outbound_ip6() const
+ {
+ return IpAddr();
+ }
+ virtual void
+ set_outbound_port(uint16_t new_port)
+ {
+ }
+ virtual void
+ set_outbound_ip(const IpAddr &new_addr)
+ {
+ }
+ virtual void
+ clear_outbound()
+ {
+ }
+ virtual bool
+ is_outbound_transparent() const
+ {
+ return false;
+ }
+ virtual void
+ set_outbound_transparent(bool flag)
+ {
+ }
+
+ virtual bool
+ ignore_keep_alive()
+ {
+ return true;
+ }
+
+ virtual void
+ destroy()
+ {
+ this->mutex.clear();
+ }
+
+ ProxyClientSession *
+ get_parent()
+ {
+ return parent;
+ }
+
+ virtual void
+ set_parent(ProxyClientSession *new_parent)
+ {
+ parent = new_parent;
+ host_res_style = parent->host_res_style;
+ }
+ virtual void
+ set_h2c_upgrade_flag()
+ {
+ }
+
+ HttpServerSession *
+ get_server_session() const
+ {
+ return parent ? parent->get_server_session() : NULL;
+ }
+
+ HttpSM *
+ get_sm() const
+ {
+ return current_reader;
+ }
+
+ virtual bool allow_half_open() const = 0;
+
+ virtual const char *get_protocol_string() const = 0;
+
+ void
+ set_restart_immediate(bool val)
+ {
+ restart_immediate = true;
+ }
+ bool
+ get_restart_immediate() const
+ {
+ return restart_immediate;
+ }
+
+
+protected:
+ ProxyClientSession *parent;
+ HttpSM *current_reader;
+ IOBufferReader *sm_reader;
+
+ /// DNS resolution preferences.
+ HostResStyle host_res_style;
+
+ bool restart_immediate;
+
+
+private:
+};
+
+#endif /* __PROXY_CLIENT_TRANSACTION_H__ */
diff --git a/proxy/http/HttpClientSession.cc b/proxy/http/Http1ClientSession.cc
similarity index 65%
rename from proxy/http/HttpClientSession.cc
rename to proxy/http/Http1ClientSession.cc
index 367b0fe..1175ceb 100644
--- a/proxy/http/HttpClientSession.cc
+++ b/proxy/http/Http1ClientSession.cc
@@ -23,21 +23,23 @@
/****************************************************************************
- HttpClientSession.cc
+ Http1ClientSession.cc
Description:
****************************************************************************/
-#include "ts/ink_config.h"
-#include "ts/Allocator.h"
-#include "HttpClientSession.h"
+#include <ts/ink_resolver.h>
+//#include "ink_config.h"
+//#include "Allocator.h"
+#include "Http1ClientSession.h"
+#include "Http1ClientTransaction.h"
#include "HttpSM.h"
#include "HttpDebugNames.h"
#include "HttpServerSession.h"
#include "Plugin.h"
-#include "Http2ClientSession.h"
+//#include "Http2ClientSession.h"
#define DebugHttpSsn(fmt, ...) DebugSsn(this, "http_cs", fmt, __VA_ARGS__)
@@ -54,26 +56,24 @@ enum {
// We have debugging list that we can use to find stuck
// client sessions
-DLL<HttpClientSession> debug_cs_list;
+DList(Http1ClientSession, debug_link) debug_cs_list;
ink_mutex debug_cs_list_mutex;
-ClassAllocator<HttpClientSession> httpClientSessionAllocator("httpClientSessionAllocator");
+ClassAllocator<Http1ClientSession> http1ClientSessionAllocator("http1ClientSessionAllocator");
-HttpClientSession::HttpClientSession()
+Http1ClientSession::Http1ClientSession()
: con_id(0), client_vc(NULL), magic(HTTP_CS_MAGIC_DEAD), transact_count(0), tcp_init_cwnd_set(false), half_close(false),
- conn_decrease(false), upgrade_to_h2c(false), bound_ss(NULL), read_buffer(NULL), current_reader(NULL), read_state(HCS_INIT),
- ka_vio(NULL), slave_ka_vio(NULL), outbound_port(0), f_outbound_transparent(false), host_res_style(HOST_RES_IPV4),
- acl_record(NULL), m_active(false)
+ conn_decrease(false), read_buffer(NULL), read_state(HCS_INIT), ka_vio(NULL), slave_ka_vio(NULL), outbound_port(0),
+ f_outbound_transparent(false), m_active(false)
{
}
void
-HttpClientSession::destroy()
+Http1ClientSession::destroy()
{
DebugHttpSsn("[%" PRId64 "] session destroy", con_id);
- ink_release_assert(upgrade_to_h2c || !client_vc);
- ink_release_assert(bound_ss == NULL);
+ ink_release_assert(!client_vc);
ink_assert(read_buffer);
magic = HTTP_CS_MAGIC_DEAD;
@@ -84,7 +84,7 @@ HttpClientSession::destroy()
#ifdef USE_HTTP_DEBUG_LISTS
ink_mutex_acquire(&debug_cs_list_mutex);
- debug_cs_list.remove(this, this->debug_link);
+ debug_cs_list.remove(this);
ink_mutex_release(&debug_cs_list_mutex);
#endif
@@ -93,68 +93,27 @@ HttpClientSession::destroy()
conn_decrease = false;
}
- super::destroy();
- THREAD_FREE(this, httpClientSessionAllocator, this_thread());
-}
-
-void
-HttpClientSession::ssn_hook_append(TSHttpHookID id, INKContInternal *cont)
-{
- ProxyClientSession::ssn_hook_append(id, cont);
- if (current_reader) {
- current_reader->hooks_set = 1;
- }
-}
-
-void
-HttpClientSession::ssn_hook_prepend(TSHttpHookID id, INKContInternal *cont)
-{
- ProxyClientSession::ssn_hook_prepend(id, cont);
- if (current_reader) {
- current_reader->hooks_set = 1;
- }
-}
-
-void
-HttpClientSession::new_transaction()
-{
- ink_assert(current_reader == NULL);
- PluginIdentity *pi = dynamic_cast<PluginIdentity *>(client_vc);
-
- if (!pi && client_vc->add_to_active_queue() == false) {
- // no room in the active queue close the connection
- this->do_io_close();
- return;
- }
-
+ // Clean up the write VIO in case of inactivity timeout
+ this->do_io_write(NULL, 0, NULL);
- // Defensive programming, make sure nothing persists across
- // connection re-use
- half_close = false;
+ // Free the transaction resources
+ this->trans.cleanup();
- read_state = HCS_ACTIVE_READER;
- current_reader = HttpSM::allocate();
- current_reader->init();
- transact_count++;
- DebugHttpSsn("[%" PRId64 "] Starting transaction %d using sm [%" PRId64 "]", con_id, transact_count, current_reader->sm_id);
-
- current_reader->attach_client_session(this, sm_reader);
- if (pi) {
- // it's a plugin VC of some sort with identify information.
- // copy it to the SM.
- current_reader->plugin_tag = pi->getPluginTag();
- current_reader->plugin_id = pi->getPluginId();
- }
+ super::destroy();
+ THREAD_FREE(this, http1ClientSessionAllocator, this_thread());
}
void
-HttpClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor)
+Http1ClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor)
{
ink_assert(new_vc != NULL);
ink_assert(client_vc == NULL);
client_vc = new_vc;
magic = HTTP_CS_MAGIC_ALIVE;
mutex = new_vc->mutex;
+ trans.mutex = mutex; // Share this mutex with the transaction
+ ssn_start_time = Thread::get_hrtime();
+
MUTEX_TRY_LOCK(lock, mutex, this_ethread());
ink_assert(lock.is_locked());
@@ -194,30 +153,15 @@ HttpClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBu
#ifdef USE_HTTP_DEBUG_LISTS
ink_mutex_acquire(&debug_cs_list_mutex);
- debug_cs_list.push(this, this->debug_link);
+ debug_cs_list.push(this);
ink_mutex_release(&debug_cs_list_mutex);
#endif
DebugHttpSsn("[%" PRId64 "] session born, netvc %p", con_id, new_vc);
- RecString congestion_control_in;
- if (REC_ReadConfigStringAlloc(congestion_control_in, "proxy.config.net.tcp_congestion_control_in") == REC_ERR_OKAY) {
- int len = strlen(congestion_control_in);
- if (len > 0) {
- client_vc->set_tcp_congestion_control(congestion_control_in, len);
- }
- ats_free(congestion_control_in);
- }
- if (!iobuf) {
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(new_vc);
- if (ssl_vc) {
- iobuf = ssl_vc->get_ssl_iobuf();
- sm_reader = ssl_vc->get_ssl_reader();
- }
- }
-
read_buffer = iobuf ? iobuf : new_MIOBuffer(HTTP_HEADER_BUFFER_SIZE_INDEX);
sm_reader = reader ? reader : read_buffer->alloc_reader();
+ trans.set_reader(sm_reader);
// INKqa11186: Use a local pointer to the mutex as
// when we return from do_api_callout, the ClientSession may
@@ -231,29 +175,31 @@ HttpClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBu
}
VIO *
-HttpClientSession::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
+Http1ClientSession::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
{
return client_vc->do_io_read(c, nbytes, buf);
}
VIO *
-HttpClientSession::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
+Http1ClientSession::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
{
/* conditionally set the tcp initial congestion window
before our first write. */
DebugHttpSsn("tcp_init_cwnd_set %d\n", (int)tcp_init_cwnd_set);
- // Checking c to avoid clang detected NULL derference path
- if (c && !tcp_init_cwnd_set) {
+ if (!tcp_init_cwnd_set) {
tcp_init_cwnd_set = true;
set_tcp_init_cwnd();
}
- return client_vc->do_io_write(c, nbytes, buf, owner);
+ if (client_vc)
+ return client_vc->do_io_write(c, nbytes, buf, owner);
+ else
+ return NULL;
}
void
-HttpClientSession::set_tcp_init_cwnd()
+Http1ClientSession::set_tcp_init_cwnd()
{
- int desired_tcp_init_cwnd = current_reader->t_state.txn_conf->server_tcp_init_cwnd;
+ int desired_tcp_init_cwnd = trans.get_sm()->t_state.txn_conf->server_tcp_init_cwnd;
DebugHttpSsn("desired TCP congestion window is %d\n", desired_tcp_init_cwnd);
if (desired_tcp_init_cwnd == 0)
return;
@@ -262,13 +208,13 @@ HttpClientSession::set_tcp_init_cwnd()
}
void
-HttpClientSession::do_io_shutdown(ShutdownHowTo_t howto)
+Http1ClientSession::do_io_shutdown(ShutdownHowTo_t howto)
{
client_vc->do_io_shutdown(howto);
}
void
-HttpClientSession::do_io_close(int alerrno)
+Http1ClientSession::do_io_close(int alerrno)
{
if (read_state == HCS_ACTIVE_READER) {
HTTP_DECREMENT_DYN_STAT(http_current_client_transactions_stat);
@@ -289,9 +235,9 @@ HttpClientSession::do_io_close(int alerrno)
slave_ka_vio = NULL;
}
- if (half_close && this->current_reader) {
+ if (half_close && this->trans.get_sm()) {
read_state = HCS_HALF_CLOSED;
- SET_HANDLER(&HttpClientSession::state_wait_for_close);
+ SET_HANDLER(&Http1ClientSession::state_wait_for_close);
DebugHttpSsn("[%" PRId64 "] session half close", con_id);
// We want the client to know that that we're finished
@@ -313,25 +259,10 @@ HttpClientSession::do_io_close(int alerrno)
// Set the active timeout to the same as the inactive time so
// that this connection does not hang around forever if
// the ua hasn't closed
- client_vc->set_active_timeout(HRTIME_SECONDS(current_reader->t_state.txn_conf->keep_alive_no_activity_timeout_out));
+ client_vc->set_active_timeout(HRTIME_SECONDS(trans.get_sm()->t_state.txn_conf->keep_alive_no_activity_timeout_out));
} else {
read_state = HCS_CLOSED;
- // clean up ssl's first byte iobuf
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(client_vc);
- if (ssl_vc) {
- ssl_vc->set_ssl_iobuf(NULL);
- }
- if (upgrade_to_h2c && this->current_reader) {
- Http2ClientSession *h2_session = http2ClientSessionAllocator.alloc();
-
- h2_session->set_upgrade_context(¤t_reader->t_state.hdr_info.client_request);
- h2_session->new_connection(client_vc, NULL, NULL, false /* backdoor */);
- // Handed over control of the VC to the new H2 session, don't clean it up
- this->release_netvc();
- // TODO Consider about handling HTTP/1 hooks and stats
- } else {
- DebugHttpSsn("[%" PRId64 "] session closed", con_id);
- }
+ DebugHttpSsn("[%" PRId64 "] session closed", con_id);
HTTP_SUM_DYN_STAT(http_transactions_per_client_con, transact_count);
HTTP_DECREMENT_DYN_STAT(http_current_client_connections_stat);
conn_decrease = false;
@@ -340,9 +271,9 @@ HttpClientSession::do_io_close(int alerrno)
}
int
-HttpClientSession::state_wait_for_close(int event, void *data)
+Http1ClientSession::state_wait_for_close(int event, void *data)
{
- STATE_ENTER(&HttpClientSession::state_wait_for_close, event, data);
+ STATE_ENTER(&Http1ClientSession::state_wait_for_close, event, data);
ink_assert(data == ka_vio);
ink_assert(read_state == HCS_HALF_CLOSED);
@@ -368,12 +299,11 @@ HttpClientSession::state_wait_for_close(int event, void *data)
}
int
-HttpClientSession::state_slave_keep_alive(int event, void *data)
+Http1ClientSession::state_slave_keep_alive(int event, void *data)
{
- STATE_ENTER(&HttpClientSession::state_slave_keep_alive, event, data);
+ STATE_ENTER(&Http1ClientSession::state_slave_keep_alive, event, data);
ink_assert(data == slave_ka_vio);
- ink_assert(bound_ss != NULL);
switch (event) {
default:
@@ -403,7 +333,7 @@ HttpClientSession::state_slave_keep_alive(int event, void *data)
}
int
-HttpClientSession::state_keep_alive(int event, void *data)
+Http1ClientSession::state_keep_alive(int event, void *data)
{
// Route the event. It is either for client vc or
// the origin server slave vc
@@ -414,7 +344,7 @@ HttpClientSession::state_keep_alive(int event, void *data)
ink_assert(read_state == HCS_KEEP_ALIVE);
}
- STATE_ENTER(&HttpClientSession::state_keep_alive, event, data);
+ STATE_ENTER(&Http1ClientSession::state_keep_alive, event, data);
switch (event) {
case VC_EVENT_READ_READY:
@@ -426,11 +356,14 @@ HttpClientSession::state_keep_alive(int event, void *data)
case VC_EVENT_EOS:
// If there is data in the buffer, start a new
// transaction, otherwise the client gave up
- if (sm_reader->read_avail() > 0) {
- new_transaction();
- } else {
- this->do_io_close();
- }
+ // SKH - A bit odd starting a transaction when the client has closed
+ // already. At a minimum, should have to do some half open connection
+ // tracking
+ // if (sm_reader->read_avail() > 0) {
+ // new_transaction();
+ //} else {
+ this->do_io_close();
+ //}
break;
case VC_EVENT_READ_COMPLETE:
@@ -449,13 +382,69 @@ HttpClientSession::state_keep_alive(int event, void *data)
return 0;
}
void
-HttpClientSession::reenable(VIO *vio)
+Http1ClientSession::reenable(VIO *vio)
{
client_vc->reenable(vio);
}
+// Called from the Http1ClientTransaction::release
+void
+Http1ClientSession::release(ProxyClientTransaction *trans)
+{
+ ink_assert(read_state == HCS_ACTIVE_READER);
+
+ // Clean up the write VIO in case of inactivity timeout
+ this->do_io_write(NULL, 0, NULL);
+
+ // Check to see there is remaining data in the
+ // buffer. If there is, spin up a new state
+ // machine to process it. Otherwise, issue an
+ // IO to wait for new data
+ bool more_to_read = this->sm_reader->is_read_avail_more_than(0);
+ if (more_to_read) {
+ trans->destroy();
+ trans->set_restart_immediate(true);
+ DebugHttpSsn("[%" PRId64 "] data already in buffer, starting new transaction", con_id);
+ new_transaction();
+ } else {
+ DebugHttpSsn("[%" PRId64 "] initiating io for next header", con_id);
+ trans->set_restart_immediate(false);
+ read_state = HCS_KEEP_ALIVE;
+ SET_HANDLER(&Http1ClientSession::state_keep_alive);
+ ka_vio = this->do_io_read(this, INT64_MAX, read_buffer);
+ ink_assert(slave_ka_vio != ka_vio);
+
+ // Y!?
+ // client_vc->add_to_keep_alive_lru();
+ client_vc->cancel_active_timeout();
+ trans->destroy();
+ }
+}
+
+void
+Http1ClientSession::new_transaction()
+{
+ // If the client connection terminated during API callouts we're done.
+ if (NULL == client_vc) {
+ this->do_io_close(); // calls the SSN_CLOSE hooks to match the SSN_START hooks.
+ return;
+ }
+
+ // Defensive programming, make sure nothing persists across
+ // connection re-use
+ half_close = false;
+
+ read_state = HCS_ACTIVE_READER;
+
+ trans.set_parent(this);
+ transact_count++;
+ // Y!?
+ // client_vc->remove_from_keep_alive_lru();
+ trans.new_transaction();
+}
+
void
-HttpClientSession::attach_server_session(HttpServerSession *ssession, bool transaction_done)
+Http1ClientSession::attach_server_session(HttpServerSession *ssession, bool transaction_done)
{
if (ssession) {
ink_assert(bound_ss == NULL);
@@ -463,7 +452,7 @@ HttpClientSession::attach_server_session(HttpServerSession *ssession, bool trans
bound_ss = ssession;
DebugHttpSsn("[%" PRId64 "] attaching server session [%" PRId64 "] as slave", con_id, ssession->con_id);
ink_assert(ssession->get_reader()->read_avail() == 0);
- ink_assert(ssession->get_netvc() != client_vc);
+ ink_assert(ssession->get_netvc() != this->get_netvc());
// handling potential keep-alive here
if (m_active) {
@@ -473,7 +462,7 @@ HttpClientSession::attach_server_session(HttpServerSession *ssession, bool trans
// Since this our slave, issue an IO to detect a close and
// have it call the client session back. This IO also prevent
// the server net conneciton from calling back a dead sm
- SET_HANDLER(&HttpClientSession::state_keep_alive);
+ SET_HANDLER(&Http1ClientSession::state_keep_alive);
slave_ka_vio = ssession->do_io_read(this, INT64_MAX, ssession->read_buffer);
ink_assert(slave_ka_vio != ka_vio);
@@ -482,7 +471,7 @@ HttpClientSession::attach_server_session(HttpServerSession *ssession, bool trans
if (transaction_done) {
ssession->get_netvc()->set_inactivity_timeout(
- HRTIME_SECONDS(current_reader->t_state.txn_conf->keep_alive_no_activity_timeout_out));
+ HRTIME_SECONDS(trans.get_sm()->t_state.txn_conf->keep_alive_no_activity_timeout_out));
ssession->get_netvc()->cancel_active_timeout();
} else {
// we are serving from the cache - this could take a while.
@@ -495,56 +484,3 @@ HttpClientSession::attach_server_session(HttpServerSession *ssession, bool trans
slave_ka_vio = NULL;
}
}
-
-void
-HttpClientSession::release(IOBufferReader *r)
-{
- ink_assert(read_state == HCS_ACTIVE_READER);
- ink_assert(current_reader != NULL);
- MgmtInt ka_in = current_reader->t_state.txn_conf->keep_alive_no_activity_timeout_in;
-
- DebugHttpSsn("[%" PRId64 "] session released by sm [%" PRId64 "]", con_id, current_reader->sm_id);
- current_reader = NULL;
-
- // handling potential keep-alive here
- if (m_active) {
- m_active = false;
- HTTP_DECREMENT_DYN_STAT(http_current_active_client_connections_stat);
- }
- // Make sure that the state machine is returning
- // correct buffer reader
- ink_assert(r == sm_reader);
- if (r != sm_reader) {
- this->do_io_close();
- return;
- }
-
- HTTP_DECREMENT_DYN_STAT(http_current_client_transactions_stat);
-
- // Clean up the write VIO in case of inactivity timeout
- this->do_io_write(NULL, 0, NULL);
-
- // Check to see there is remaining data in the
- // buffer. If there is, spin up a new state
- // machine to process it. Otherwise, issue an
- // IO to wait for new data
- if (sm_reader->read_avail() > 0) {
- DebugHttpSsn("[%" PRId64 "] data already in buffer, starting new transaction", con_id);
- new_transaction();
- } else {
- DebugHttpSsn("[%" PRId64 "] initiating io for next header", con_id);
- read_state = HCS_KEEP_ALIVE;
- SET_HANDLER(&HttpClientSession::state_keep_alive);
- ka_vio = this->do_io_read(this, INT64_MAX, read_buffer);
- ink_assert(slave_ka_vio != ka_vio);
- client_vc->set_inactivity_timeout(HRTIME_SECONDS(ka_in));
- client_vc->cancel_active_timeout();
- client_vc->add_to_keep_alive_queue();
- }
-}
-
-HttpServerSession *
-HttpClientSession::get_bound_ss()
-{
- return bound_ss;
-}
diff --git a/proxy/http/HttpClientSession.h b/proxy/http/Http1ClientSession.h
similarity index 72%
rename from proxy/http/HttpClientSession.h
rename to proxy/http/Http1ClientSession.h
index 1f08ee0..3f76519 100644
--- a/proxy/http/HttpClientSession.h
+++ b/proxy/http/Http1ClientSession.h
@@ -23,23 +23,23 @@
/****************************************************************************
- HttpClientSession.h
+ Http1ClientSession.h
Description:
****************************************************************************/
-#ifndef _HTTP_CLIENT_SESSION_H_
-#define _HTTP_CLIENT_SESSION_H_
+#ifndef _HTTP1_CLIENT_SESSION_H_
+#define _HTTP1_CLIENT_SESSION_H_
-#include "ts/ink_platform.h"
-#include "ts/ink_resolver.h"
+//#include "libts.h"
#include "P_Net.h"
#include "InkAPIInternal.h"
#include "HTTP.h"
#include "HttpConfig.h"
#include "IPAllow.h"
#include "ProxyClientSession.h"
+#include "Http1ClientTransaction.h"
extern ink_mutex debug_cs_list_mutex;
@@ -48,11 +48,11 @@ class HttpServerSession;
class SecurityContext;
-class HttpClientSession : public ProxyClientSession
+class Http1ClientSession : public ProxyClientSession
{
public:
typedef ProxyClientSession super; ///< Parent type.
- HttpClientSession();
+ Http1ClientSession();
// Implement ProxyClientSession interface.
virtual void destroy();
@@ -60,7 +60,8 @@ public:
virtual void
start()
{
- new_transaction();
+ // Create a new transaction object and kick it off
+ this->new_transaction();
}
void new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor);
@@ -73,63 +74,89 @@ public:
virtual void do_io_shutdown(ShutdownHowTo_t howto);
virtual void reenable(VIO *vio);
- void new_transaction();
-
- void
- set_half_close_flag()
- {
- half_close = true;
- }
void
- clear_half_close_flag()
+ set_half_close_flag(bool flag)
{
- half_close = false;
- }
+ half_close = flag;
+ };
bool
get_half_close_flag() const
{
return half_close;
- }
- virtual void release(IOBufferReader *r);
+ };
virtual NetVConnection *
get_netvc() const
{
return client_vc;
- }
+ };
virtual void
release_netvc()
{
client_vc = NULL;
}
- virtual void attach_server_session(HttpServerSession *ssession, bool transaction_done = true);
- HttpServerSession *
- get_server_session() const
+ int
+ get_transact_count() const
{
- return bound_ss;
+ return transact_count;
}
- // Used for the cache authenticated HTTP content feature
- HttpServerSession *get_bound_ss();
+ virtual bool
+ is_outbound_transparent()
+ {
+ return f_outbound_transparent;
+ }
- // Functions for manipulating api hooks
- void ssn_hook_append(TSHttpHookID id, INKContInternal *cont);
- void ssn_hook_prepend(TSHttpHookID id, INKContInternal *cont);
+ // Indicate we are done with a transaction
+ virtual void release(ProxyClientTransaction *trans);
- int
- get_transact_count() const
+ virtual uint16_t
+ get_outbound_port() const
{
- return transact_count;
+ return outbound_port;
+ }
+ virtual IpAddr
+ get_outbound_ip4() const
+ {
+ return outbound_ip4;
+ }
+ virtual IpAddr
+ get_outbound_ip6() const
+ {
+ return outbound_ip6;
+ }
+
+ virtual void attach_server_session(HttpServerSession *ssession, bool transaction_done = true);
+
+ virtual HttpServerSession *
+ get_server_session() const
+ {
+ return bound_ss;
}
void
- set_h2c_upgrade_flag()
+ set_active_timeout(ink_hrtime timeout_in)
+ {
+ if (client_vc)
+ client_vc->set_active_timeout(timeout_in);
+ }
+ void
+ set_inactivity_timeout(ink_hrtime timeout_in)
{
- upgrade_to_h2c = true;
+ if (client_vc)
+ client_vc->set_inactivity_timeout(timeout_in);
+ }
+ void
+ cancel_inactivity_timeout()
+ {
+ if (client_vc)
+ client_vc->cancel_inactivity_timeout();
}
private:
- HttpClientSession(HttpClientSession &);
+ Http1ClientSession(Http1ClientSession &);
+
+ void new_transaction();
int state_keep_alive(int event, void *data);
int state_slave_keep_alive(int event, void *data);
@@ -151,21 +178,21 @@ private:
bool tcp_init_cwnd_set;
bool half_close;
bool conn_decrease;
- bool upgrade_to_h2c; // Switching to HTTP/2 with upgrade mechanism
-
- HttpServerSession *bound_ss;
MIOBuffer *read_buffer;
IOBufferReader *sm_reader;
- HttpSM *current_reader;
C_Read_State read_state;
VIO *ka_vio;
VIO *slave_ka_vio;
- Link<HttpClientSession> debug_link;
+
+ HttpServerSession *bound_ss;
public:
+ // Link<Http1ClientSession> debug_link;
+ LINK(Http1ClientSession, debug_link);
+
/// Local address for outbound connection.
IpAddr outbound_ip4;
/// Local address for outbound connection.
@@ -176,18 +203,16 @@ public:
bool f_outbound_transparent;
/// Transparently pass-through non-HTTP traffic.
bool f_transparent_passthrough;
- /// DNS resolution preferences.
- HostResStyle host_res_style;
- /// acl record - cache IpAllow::match() call
- const AclRecord *acl_record;
+
// for DI. An active connection is one that a request has
// been successfully parsed (PARSE_DONE) and it remains to
// be active until the transaction goes through or the client
// aborts.
bool m_active;
+ Http1ClientTransaction trans;
};
-extern ClassAllocator<HttpClientSession> httpClientSessionAllocator;
+extern ClassAllocator<Http1ClientSession> http1ClientSessionAllocator;
#endif
diff --git a/proxy/http/Http1ClientTransaction.cc b/proxy/http/Http1ClientTransaction.cc
new file mode 100644
index 0000000..22fb1d5
--- /dev/null
+++ b/proxy/http/Http1ClientTransaction.cc
@@ -0,0 +1,67 @@
+/** @file
+
+ Http1ClientTransaction.cc - The Transaction class for Http1*
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "Http1ClientTransaction.h"
+#include "Http1ClientSession.h"
+#include "HttpSM.h"
+
+void
+Http1ClientTransaction::release(IOBufferReader *r)
+{
+ // Must set this inactivity count here rather than in the session because the state machine
+ // is not availble then
+ MgmtInt ka_in = current_reader->t_state.txn_conf->keep_alive_no_activity_timeout_in;
+ get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(ka_in));
+
+ if (m_active) {
+ m_active = false;
+ HTTP_DECREMENT_DYN_STAT(http_current_active_client_connections_stat);
+ }
+
+ HTTP_DECREMENT_DYN_STAT(http_current_client_transactions_stat);
+
+ parent->ssn_last_txn_time = Thread::get_hrtime();
+
+ // Make sure that the state machine is returning
+ // correct buffer reader
+ ink_assert(r == sm_reader);
+ if (r != sm_reader) {
+ this->do_io_close();
+ } else {
+ super::release(r);
+ }
+}
+
+void
+Http1ClientTransaction::set_parent(ProxyClientSession *new_parent)
+{
+ parent = new_parent;
+ Http1ClientSession *http1_parent = dynamic_cast<Http1ClientSession *>(new_parent);
+ if (http1_parent) {
+ outbound_port = http1_parent->outbound_port;
+ outbound_ip4 = http1_parent->outbound_ip4;
+ outbound_ip6 = http1_parent->outbound_ip6;
+ outbound_transparent = http1_parent->f_outbound_transparent;
+ }
+ super::set_parent(new_parent);
+}
diff --git a/proxy/http/Http1ClientTransaction.h b/proxy/http/Http1ClientTransaction.h
new file mode 100644
index 0000000..4d1eba0
--- /dev/null
+++ b/proxy/http/Http1ClientTransaction.h
@@ -0,0 +1,181 @@
+/** @file
+
+ Http1ClientTransaction.h - The Transaction class for Http1*
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef __HTTP_CLIENT_TRANSACTION_H_
+#define __HTTP_CLIENT_TRANSACTION_H_
+
+#include "../ProxyClientTransaction.h"
+
+class Continuation;
+
+class Http1ClientTransaction : public ProxyClientTransaction
+{
+public:
+ typedef ProxyClientTransaction super;
+
+ Http1ClientTransaction() : super() {}
+
+ // Implement VConnection interface.
+ virtual VIO *
+ do_io_read(Continuation *c, int64_t nbytes = INT64_MAX, MIOBuffer *buf = 0)
+ {
+ return parent->do_io_read(c, nbytes, buf);
+ }
+ virtual VIO *
+ do_io_write(Continuation *c = NULL, int64_t nbytes = INT64_MAX, IOBufferReader *buf = 0, bool owner = false)
+ {
+ return parent->do_io_write(c, nbytes, buf, owner);
+ }
+
+ virtual void
+ do_io_close(int lerrno = -1)
+ {
+ parent->do_io_close(lerrno);
+ // this->destroy(); Parent owns this data structure. No need for separate destroy.
+ }
+
+ // Don't destroy your elements. Rely on the Http1ClientSession to clean up the
+ // Http1ClientTransaction class as necessary
+ virtual void
+ destroy()
+ {
+ }
+
+ // Clean up the transaction elements when the ClientSession shuts down
+ void
+ cleanup()
+ {
+ super::destroy();
+ }
+
+ virtual void
+ do_io_shutdown(ShutdownHowTo_t howto)
+ {
+ parent->do_io_shutdown(howto);
+ }
+ virtual void
+ reenable(VIO *vio)
+ {
+ parent->reenable(vio);
+ }
+ void
+ set_reader(IOBufferReader *reader)
+ {
+ sm_reader = reader;
+ }
+ void release(IOBufferReader *r);
+ virtual bool
+ ignore_keep_alive()
+ {
+ return false;
+ }
+
+ virtual bool
+ allow_half_open() const
+ {
+ return true;
+ }
+ virtual const char *
+ get_protocol_string() const
+ {
+ return "http";
+ }
+
+ void set_parent(ProxyClientSession *new_parent);
+
+ virtual uint16_t
+ get_outbound_port() const
+ {
+ return outbound_port;
+ }
+ virtual IpAddr
+ get_outbound_ip4() const
+ {
+ return outbound_ip4;
+ }
+ virtual IpAddr
+ get_outbound_ip6() const
+ {
+ return outbound_ip6;
+ }
+ virtual void
+ set_outbound_port(uint16_t new_port)
+ {
+ outbound_port = new_port;
+ }
+ virtual void
+ set_outbound_ip(const IpAddr &new_addr)
+ {
+ if (new_addr.isIp4()) {
+ outbound_ip4 = new_addr;
+ } else if (new_addr.isIp6()) {
+ outbound_ip6 = new_addr;
+ } else {
+ clear_outbound_ip();
+ }
+ }
+ virtual void
+ clear_outbound_ip()
+ {
+ outbound_ip4.invalidate();
+ outbound_ip6.invalidate();
+ }
+ virtual bool
+ is_outbound_transparent() const
+ {
+ return outbound_transparent;
+ }
+ virtual void
+ set_outbound_transparent(bool flag)
+ {
+ outbound_transparent = flag;
+ }
+
+ // Pass on the timeouts to the netvc
+ virtual void
+ set_active_timeout(ink_hrtime timeout_in)
+ {
+ if (parent)
+ parent->set_active_timeout(timeout_in);
+ }
+ virtual void
+ set_inactivity_timeout(ink_hrtime timeout_in)
+ {
+ if (parent)
+ parent->set_inactivity_timeout(timeout_in);
+ }
+ virtual void
+ cancel_inactivity_timeout()
+ {
+ if (parent)
+ parent->cancel_inactivity_timeout();
+ }
+
+protected:
+ uint16_t outbound_port;
+ IpAddr outbound_ip4;
+ IpAddr outbound_ip6;
+ bool outbound_transparent;
+};
+
+#endif
diff --git a/proxy/http/HttpProxyServerMain.cc b/proxy/http/HttpProxyServerMain.cc
index 74b2a75..76c0925 100644
--- a/proxy/http/HttpProxyServerMain.cc
+++ b/proxy/http/HttpProxyServerMain.cc
@@ -30,7 +30,9 @@
#include "ReverseProxy.h"
#include "HttpSessionManager.h"
#include "HttpUpdateSM.h"
-#include "HttpClientSession.h"
+#ifdef USE_HTTP_DEBUG_LISTS
+#include "Http1ClientSession.h"
+#endif
#include "HttpPages.h"
#include "HttpTunnel.h"
#include "ts/Tokenizer.h"
@@ -242,8 +244,10 @@ init_HttpProxyServer(int n_accept_threads)
init_reverse_proxy();
httpSessionManager.init();
http_pages_init();
+#ifdef USE_HTTP_DEBUG_LISTS
ink_mutex_init(&debug_sm_list_mutex, "HttpSM Debug List");
ink_mutex_init(&debug_cs_list_mutex, "HttpCS Debug List");
+#endif
// DI's request to disable/reenable ICP on the fly
icp_dynamic_enabled = 1;
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index 2272053..f2e5f74 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -22,9 +22,9 @@
*/
+#include "../ProxyClientTransaction.h"
#include "HttpSM.h"
#include "ProxyConfig.h"
-#include "HttpClientSession.h"
#include "HttpServerSession.h"
#include "HttpDebugNames.h"
#include "HttpSessionManager.h"
@@ -69,7 +69,7 @@ extern int cache_config_read_while_writer;
// We have a debugging list that can use to find stuck
// state machines
-DLL<HttpSM> debug_sm_list;
+DList(HttpSM, debug_link) debug_sm_list;
ink_mutex debug_sm_list_mutex;
static const int sub_header_size = sizeof("Content-type: ") - 1 + 2 + sizeof("Content-range: bytes ") - 1 + 4;
@@ -370,7 +370,7 @@ HttpSM::init()
#ifdef USE_HTTP_DEBUG_LISTS
ink_mutex_acquire(&debug_sm_list_mutex);
- debug_sm_list.push(this, this->debug_link);
+ debug_sm_list.push(this);
ink_mutex_release(&debug_sm_list_mutex);
#endif
}
@@ -378,7 +378,7 @@ HttpSM::init()
void
HttpSM::set_ua_half_close_flag()
{
- ua_session->set_half_close_flag();
+ ua_session->set_half_close_flag(true);
}
inline void
@@ -466,16 +466,19 @@ HttpSM::start_sub_sm()
}
void
-HttpSM::attach_client_session(HttpClientSession *client_vc, IOBufferReader *buffer_reader)
+HttpSM::attach_client_session(ProxyClientTransaction *client_vc, IOBufferReader *buffer_reader)
{
milestones[TS_MILESTONE_UA_BEGIN] = Thread::get_hrtime();
ink_assert(client_vc != NULL);
+ NetVConnection *netvc = client_vc->get_netvc();
+ if (!netvc)
+ return;
ua_session = client_vc;
// Collect log & stats information
- client_tcp_reused = (1 < ua_session->get_transact_count()) ? true : false;
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(ua_session->get_netvc());
+ client_tcp_reused = (1 < ua_session->get_transact_count());
+ SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
if (ssl_vc != NULL) {
client_connection_is_ssl = true;
client_ssl_reused = ssl_vc->getSSLSessionCacheHit();
@@ -498,8 +501,6 @@ HttpSM::attach_client_session(HttpClientSession *client_vc, IOBufferReader *buff
ua_entry->vc = client_vc;
ua_entry->vc_type = HTTP_UA_VC;
- NetVConnection *netvc = client_vc->get_netvc();
-
ats_ip_copy(&t_state.client_info.src_addr, netvc->get_remote_addr());
ats_ip_copy(&t_state.client_info.dst_addr, netvc->get_local_addr());
t_state.client_info.dst_addr.port() = netvc->get_local_port();
@@ -533,8 +534,8 @@ HttpSM::attach_client_session(HttpClientSession *client_vc, IOBufferReader *buff
/////////////////////////
// set up timeouts //
/////////////////////////
- client_vc->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(HttpConfig::m_master.accept_no_activity_timeout));
- client_vc->get_netvc()->set_active_timeout(HRTIME_SECONDS(HttpConfig::m_master.transaction_active_timeout_in));
+ client_vc->set_inactivity_timeout(HRTIME_SECONDS(HttpConfig::m_master.accept_no_activity_timeout));
+ client_vc->set_active_timeout(HRTIME_SECONDS(HttpConfig::m_master.transaction_active_timeout_in));
++reentrancy_count;
// Add our state sm to the sm list
@@ -582,6 +583,10 @@ HttpSM::state_read_client_request_header(int event, void *data)
int bytes_used = 0;
ink_assert(ua_entry->eos == false);
+ NetVConnection *netvc = ua_session->get_netvc();
+ if (!netvc && event != VC_EVENT_EOS)
+ return 0;
+
// check to see if there was an EOS received on the SSL connection
SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(ua_session->get_netvc());
if (ssl_vc && ssl_vc->isEosRcvd()) {
@@ -614,11 +619,11 @@ HttpSM::state_read_client_request_header(int event, void *data)
// Reset the inactivity timeout if this is the first
// time we've been called. The timeout had been set to
- // the accept timeout by the HttpClientSession
+ // the accept timeout by the ProxyClientTransaction
//
if ((ua_buffer_reader->read_avail() > 0) && (client_request_hdr_bytes == 0)) {
milestones[TS_MILESTONE_UA_FIRST_READ] = Thread::get_hrtime();
- ua_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in));
+ ua_session->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in));
}
/////////////////////
// tokenize header //
@@ -659,6 +664,7 @@ HttpSM::state_read_client_request_header(int event, void *data)
// Turn off read eventing until we get the
// blind tunnel infrastructure set up
ua_session->get_netvc()->do_io_read(this, 0, NULL);
+ netvc->do_io_read(this, 0, NULL);
/* establish blind tunnel */
setup_blind_tunnel_port();
@@ -744,7 +750,7 @@ HttpSM::state_read_client_request_header(int event, void *data)
DebugSM("http_seq", "send 100 Continue response to client");
int64_t nbytes = ua_entry->write_buffer->write(str_100_continue_response, len_100_continue_response);
- ua_session->do_io_write(ua_session->get_netvc(), nbytes, buf_start);
+ ua_session->do_io_write(netvc, nbytes, buf_start);
}
}
@@ -784,6 +790,10 @@ HttpSM::state_drain_client_request_body(int event, void *data)
ink_assert(ua_entry->read_vio == (VIO *)data);
ink_assert(ua_entry->vc == ua_session);
+ NetVConnection *netvc = ua_session->get_netvc();
+ if (!netvc && event != VC_EVENT_EOS)
+ return 0;
+
switch (event) {
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
@@ -843,11 +853,26 @@ HttpSM::state_watch_for_client_abort(int event, void *data)
* data that may still be sent from the server and send it to the
* client.
*/
- case VC_EVENT_EOS:
+ case VC_EVENT_EOS: {
// We got an early EOS.
- ua_session->get_netvc()->do_io_shutdown(IO_SHUTDOWN_READ);
- ua_entry->eos = true;
+ NetVConnection *netvc = ua_session->get_netvc();
+ if (ua_session->allow_half_open()) {
+ if (netvc)
+ netvc->do_io_shutdown(IO_SHUTDOWN_READ);
+ ua_entry->eos = true;
+ } else {
+ if (netvc)
+ netvc->do_io_close();
+ ua_session->do_io_close();
+ ua_session = NULL;
+ ua_buffer_reader = NULL;
+ vc_table.cleanup_entry(ua_entry);
+ ua_entry = NULL;
+ tunnel.kill_tunnel();
+ terminate_sm = true; // Just die already, the requester is gone
+ }
break;
+ }
case VC_EVENT_ERROR:
case VC_EVENT_ACTIVE_TIMEOUT:
case VC_EVENT_INACTIVITY_TIMEOUT: {
@@ -874,6 +899,7 @@ HttpSM::state_watch_for_client_abort(int event, void *data)
mark_server_down_on_client_abort();
milestones[TS_MILESTONE_UA_CLOSE] = Thread::get_hrtime();
set_ua_abort(HttpTransact::ABORTED, event);
+
terminate_sm = true;
break;
}
@@ -1540,7 +1566,7 @@ HttpSM::handle_api_return()
case HttpTransact::SM_ACTION_API_SEND_RESPONSE_HDR:
// Set back the inactivity timeout
if (ua_session) {
- ua_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in));
+ ua_session->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->transaction_no_activity_timeout_in));
}
// we have further processing to do
// based on what t_state.next_action is
@@ -1668,8 +1694,10 @@ HttpSM::state_http_server_open(int event, void *data)
}
handle_http_server_open();
return 0;
- case EVENT_INTERVAL:
- do_http_server_open();
+ case EVENT_INTERVAL: // Delayed call from another thread
+ if (server_session == NULL) {
+ do_http_server_open();
+ }
break;
case VC_EVENT_ERROR:
case NET_EVENT_OPEN_FAILED:
@@ -1770,7 +1798,7 @@ HttpSM::state_read_server_response_header(int event, void *data)
// For requests that contain a body, we can cancel the ua inactivity timeout.
if (ua_session && t_state.hdr_info.request_content_length) {
- ua_session->get_netvc()->cancel_inactivity_timeout();
+ ua_session->cancel_inactivity_timeout();
}
}
/////////////////////
@@ -2147,7 +2175,7 @@ HttpSM::state_hostdb_lookup(int event, void *data)
opt.flags = (t_state.cache_info.directives.does_client_permit_dns_storing) ? HostDBProcessor::HOSTDB_DO_NOT_FORCE_DNS :
HostDBProcessor::HOSTDB_FORCE_DNS_RELOAD;
opt.timeout = (t_state.api_txn_dns_timeout_value != -1) ? t_state.api_txn_dns_timeout_value : 0;
- opt.host_res_style = ua_session->host_res_style;
+ opt.host_res_style = ua_session->get_host_res_style();
Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (process_hostdb_info_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
@@ -2372,6 +2400,16 @@ int
HttpSM::state_cache_open_write(int event, void *data)
{
STATE_ENTER(&HttpSM : state_cache_open_write, event);
+
+ // Make sure we are on the "right" thread
+ if (ua_session) {
+ NetVConnection *vc = ua_session->get_netvc();
+ if (vc && vc->thread != this_ethread()) {
+ pending_action = vc->thread->schedule_imm(this, event, data); // Stay on the same thread!
+ return 0;
+ }
+ }
+
milestones[TS_MILESTONE_CACHE_OPEN_WRITE_END] = Thread::get_hrtime();
pending_action = NULL;
@@ -3005,7 +3043,7 @@ HttpSM::tunnel_handler_server(int event, HttpTunnelProducer *p)
propagate server closes back to the client. Part of that is
disabling KeepAlive if the server closes.
*/
- if (ua_session && ua_session->f_outbound_transparent && t_state.http_config_param->use_client_source_port) {
+ if (ua_session && ua_session->is_outbound_transparent() && t_state.http_config_param->use_client_source_port) {
t_state.client_info.keep_alive = HTTP_NO_KEEPALIVE;
}
} else {
@@ -3117,14 +3155,17 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer *c)
switch (event) {
case VC_EVENT_EOS:
ua_entry->eos = true;
+
// FALL-THROUGH
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
case VC_EVENT_ERROR:
- // The user agent died or aborted. Check to
- // see if we should setup a background fill
- set_ua_abort(HttpTransact::ABORTED, event);
+ if (ua_session != NULL) {
+ // The user agent died or aborted. Check to
+ // see if we should setup a background fill
+ set_ua_abort(HttpTransact::ABORTED, event);
+ }
if (is_bg_fill_necessary(c)) {
DebugSM("http", "[%" PRId64 "] Initiating background fill", sm_id);
@@ -3158,7 +3199,7 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer *c)
c->write_success = true;
t_state.client_info.abort = HttpTransact::DIDNOT_ABORT;
if (t_state.client_info.keep_alive == HTTP_KEEPALIVE) {
- if (t_state.www_auth_content != HttpTransact::CACHE_AUTH_SERVE || ua_session->get_bound_ss()) {
+ if (t_state.www_auth_content != HttpTransact::CACHE_AUTH_SERVE || ua_session->get_server_session()) {
// successful keep-alive
close_connection = false;
}
@@ -3220,7 +3261,7 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer *c)
}
if ((is_eligible_post_request || t_state.client_info.pipeline_possible == true) && c->producer->vc_type != HT_STATIC &&
event == VC_EVENT_WRITE_COMPLETE) {
- ua_session->set_half_close_flag();
+ ua_session->set_half_close_flag(true);
}
ua_session->do_io_close();
@@ -3998,7 +4039,7 @@ HttpSM::do_hostdb_lookup()
opt.flags = (t_state.cache_info.directives.does_client_permit_dns_storing) ? HostDBProcessor::HOSTDB_DO_NOT_FORCE_DNS :
HostDBProcessor::HOSTDB_FORCE_DNS_RELOAD;
opt.timeout = (t_state.api_txn_dns_timeout_value != -1) ? t_state.api_txn_dns_timeout_value : 0;
- opt.host_res_style = ua_session->host_res_style;
+ opt.host_res_style = ua_session->get_host_res_style();
Action *dns_lookup_action_handle =
hostDBProcessor.getbyname_imm(this, (process_hostdb_info_pfn)&HttpSM::process_hostdb_info, host_name, 0, opt);
@@ -4030,7 +4071,7 @@ HttpSM::do_hostdb_lookup()
opt.flags = (t_state.cache_info.directives.does_client_permit_dns_storing) ? HostDBProcessor::HOSTDB_DO_NOT_FORCE_DNS :
HostDBProcessor::HOSTDB_FORCE_DNS_RELOAD;
opt.timeout = (t_state.api_txn_dns_timeout_value != -1) ? t_state.api_txn_dns_timeout_value : 0;
- opt.host_res_style = ua_session->host_res_style;
+ opt.host_res_style = ua_session->get_host_res_style();
Action *dns_lookup_action_handle = hostDBProcessor.getbyname_imm(this, (process_hostdb_info_pfn)&HttpSM::process_hostdb_info,
t_state.dns_info.lookup_name, 0, opt);
@@ -4572,6 +4613,15 @@ HttpSM::do_http_server_open(bool raw)
int ip_family = t_state.current.server->dst_addr.sa.sa_family;
DebugSM("http_track", "entered inside do_http_server_open ][%s]", ats_ip_family_name(ip_family));
+ // Make sure we are on the "right" thread
+ if (ua_session) {
+ NetVConnection *vc = ua_session->get_netvc();
+ if (vc && vc->thread != this_ethread()) {
+ pending_action = vc->thread->schedule_imm(this, EVENT_INTERVAL);
+ return;
+ }
+ }
+ pending_action = NULL;
ink_assert(server_entry == NULL);
// ua_entry can be null if a scheduled update is also a reverse proxy
@@ -4710,6 +4760,7 @@ HttpSM::do_http_server_open(bool raw)
} else {
// As this is in the non-sharing configuration, we want to close
// the existing connection and call connect_re to get a new one
+ existing_ss->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->keep_alive_no_activity_timeout_out));
existing_ss->release();
ua_session->attach_server_session(NULL);
}
@@ -4721,6 +4772,7 @@ HttpSM::do_http_server_open(bool raw)
else if (ua_session != NULL) {
HttpServerSession *existing_ss = ua_session->get_server_session();
if (existing_ss) {
+ existing_ss->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->keep_alive_no_activity_timeout_out));
existing_ss->release();
ua_session->attach_server_session(NULL);
}
@@ -4803,13 +4855,13 @@ HttpSM::do_http_server_open(bool raw)
opt.ip_family = ip_family;
if (ua_session) {
- opt.local_port = ua_session->outbound_port;
+ opt.local_port = ua_session->get_outbound_port();
- IpAddr &outbound_ip = AF_INET6 == ip_family ? ua_session->outbound_ip6 : ua_session->outbound_ip4;
+ const IpAddr &outbound_ip = AF_INET6 == ip_family ? ua_session->get_outbound_ip6() : ua_session->get_outbound_ip4();
if (outbound_ip.isValid()) {
opt.addr_binding = NetVCOptions::INTF_ADDR;
opt.local_ip = outbound_ip;
- } else if (ua_session->f_outbound_transparent) {
+ } else if (ua_session->is_outbound_transparent()) {
opt.addr_binding = NetVCOptions::FOREIGN_ADDR;
opt.local_ip = t_state.client_info.src_addr;
/* If the connection is server side transparent, we can bind to the
@@ -4951,7 +5003,6 @@ HttpSM::do_api_callout_internal()
} else {
cur_hook_id = TS_HTTP_TXN_CLOSE_HOOK;
}
-
break;
default:
cur_hook_id = (TSHttpHookID)-1;
@@ -5119,6 +5170,8 @@ HttpSM::release_server_session(bool serve_from_cache)
server_session->server_trans_stat--;
server_session->attach_hostname(t_state.current.server->name);
if (t_state.www_auth_content == HttpTransact::CACHE_AUTH_NONE || serve_from_cache == false) {
+ // Must explicitly set the keep_alive_no_activity time before doing the release
+ server_session->get_netvc()->set_inactivity_timeout(HRTIME_SECONDS(t_state.txn_conf->keep_alive_no_activity_timeout_out));
server_session->release();
} else {
// an authenticated server connection - attach to the local client
@@ -5689,6 +5742,12 @@ HttpSM::attach_server_session(HttpServerSession *s)
return;
}
+ if (ua_session) {
+ NetVConnection *server_vc = s->get_netvc();
+ NetVConnection *ua_vc = ua_session->get_netvc();
+ ink_release_assert(server_vc->thread == ua_vc->thread);
+ }
+
// Set the mutex so that we have something to update
// stats with
server_session->mutex = this->mutex;
@@ -6001,7 +6060,7 @@ HttpSM::setup_100_continue_transfer()
"user agent");
// Make sure the half_close is not set.
- ua_session->clear_half_close_flag();
+ ua_session->set_half_close_flag(false);
ua_entry->in_tunnel = true;
tunnel.tunnel_run(p);
}
@@ -6727,7 +6786,7 @@ HttpSM::kill_this()
#ifdef USE_HTTP_DEBUG_LISTS
ink_mutex_acquire(&debug_sm_list_mutex);
- debug_sm_list.remove(this, this->debug_link);
+ debug_sm_list.remove(this);
ink_mutex_release(&debug_sm_list_mutex);
#endif
@@ -7010,9 +7069,9 @@ HttpSM::set_next_state()
// Use the returned "next action" code to set the next state handler //
///////////////////////////////////////////////////////////////////////
switch (t_state.next_action) {
- case HttpTransact::SM_ACTION_API_READ_REQUEST_HDR:
case HttpTransact::SM_ACTION_API_PRE_REMAP:
case HttpTransact::SM_ACTION_API_POST_REMAP:
+ case HttpTransact::SM_ACTION_API_READ_REQUEST_HDR:
case HttpTransact::SM_ACTION_API_OS_DNS:
case HttpTransact::SM_ACTION_API_SEND_REQUEST_HDR:
case HttpTransact::SM_ACTION_API_READ_CACHE_HDR:
@@ -7161,7 +7220,16 @@ HttpSM::set_next_state()
// sending its request and for this reason, the inactivity timeout
// cannot be cancelled.
if (ua_session && !t_state.hdr_info.request_content_length) {
- ua_session->get_netvc()->cancel_inactivity_timeout();
+ NetVConnection *vc = ua_session->get_netvc();
+ if (vc) {
+ ua_session->cancel_inactivity_timeout();
+ } else {
+ terminate_sm = true;
+ return; // Give up if there is no session netvc
+ }
+ } else if (!ua_session) {
+ terminate_sm = true;
+ return; // Give up if there is no session
}
}
diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h
index 72e7c2b..e83fdfb 100644
--- a/proxy/http/HttpSM.h
+++ b/proxy/http/HttpSM.h
@@ -39,7 +39,7 @@
#include "HttpTransact.h"
#include "HttpTunnel.h"
#include "InkAPIInternal.h"
-#include "HttpClientSession.h"
+#include "../ProxyClientTransaction.h"
#include "HdrUtils.h"
//#include "AuthHttpAdapter.h"
@@ -193,7 +193,7 @@ public:
void init();
- void attach_client_session(HttpClientSession *client_vc_arg, IOBufferReader *buffer_reader);
+ void attach_client_session(ProxyClientTransaction *client_vc_arg, IOBufferReader *buffer_reader);
// Called by httpSessionManager so that we can reset
// the session timeouts and initiate a read while
@@ -301,7 +301,7 @@ protected:
void remove_ua_entry();
public:
- HttpClientSession *ua_session;
+ ProxyClientTransaction *ua_session;
BackgroundFill_t background_fill;
// AuthHttpAdapter authAdapter;
void set_http_schedule(Continuation *);
@@ -548,6 +548,11 @@ public:
public:
bool set_server_session_private(bool private_session);
+ bool
+ is_dying() const
+ {
+ return terminate_sm;
+ }
};
// Function to get the cache_sm object - YTS Team, yamsat
@@ -651,7 +656,8 @@ HttpSM::add_cache_sm()
inline bool
HttpSM::is_transparent_passthrough_allowed()
{
- return (t_state.client_info.is_transparent && ua_session->f_transparent_passthrough && ua_session->get_transact_count() == 1);
+ return (t_state.client_info.is_transparent && ua_session->is_transparent_passthrough_allowed() &&
+ ua_session->get_transact_count() == 1);
}
#endif
diff --git a/proxy/http/HttpSessionAccept.cc b/proxy/http/HttpSessionAccept.cc
index 9116a13..330e6da 100644
--- a/proxy/http/HttpSessionAccept.cc
+++ b/proxy/http/HttpSessionAccept.cc
@@ -23,7 +23,7 @@
#include "HttpSessionAccept.h"
#include "IPAllow.h"
-#include "HttpClientSession.h"
+#include "Http1ClientSession.h"
#include "I_Machine.h"
#include "Error.h"
@@ -60,7 +60,7 @@ HttpSessionAccept::accept(NetVConnection *netvc, MIOBuffer *iobuf, IOBufferReade
ats_ip_nptop(client_ip, ipb, sizeof(ipb)), netvc->attributes);
}
- HttpClientSession *new_session = THREAD_ALLOC_INIT(httpClientSessionAllocator, this_ethread());
+ Http1ClientSession *new_session = THREAD_ALLOC_INIT(http1ClientSessionAllocator, this_ethread());
// copy over session related data.
new_session->f_outbound_transparent = f_outbound_transparent;
diff --git a/proxy/http/HttpSessionManager.cc b/proxy/http/HttpSessionManager.cc
index 2f3bd63..e62e810 100644
--- a/proxy/http/HttpSessionManager.cc
+++ b/proxy/http/HttpSessionManager.cc
@@ -31,7 +31,7 @@
****************************************************************************/
#include "HttpSessionManager.h"
-#include "HttpClientSession.h"
+#include "../ProxyClientSession.h"
#include "HttpServerSession.h"
#include "HttpSM.h"
#include "HttpDebugNames.h"
@@ -239,7 +239,7 @@ HttpSessionManager::purge_keepalives()
HSMresult_t
HttpSessionManager::acquire_session(Continuation * /* cont ATS_UNUSED */, sockaddr const *ip, const char *hostname,
- HttpClientSession *ua_session, HttpSM *sm)
+ ProxyClientTransaction *ua_session, HttpSM *sm)
{
HttpServerSession *to_return = NULL;
TSServerSessionSharingMatchType match_style =
diff --git a/proxy/http/HttpSessionManager.h b/proxy/http/HttpSessionManager.h
index 4372240..01056ee 100644
--- a/proxy/http/HttpSessionManager.h
+++ b/proxy/http/HttpSessionManager.h
@@ -38,7 +38,7 @@
#include "HttpServerSession.h"
#include <ts/Map.h>
-class HttpClientSession;
+class ProxyClientTransaction;
class HttpSM;
void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
@@ -158,7 +158,7 @@ public:
~HttpSessionManager() {}
- HSMresult_t acquire_session(Continuation *cont, sockaddr const *addr, const char *hostname, HttpClientSession *ua_session,
+ HSMresult_t acquire_session(Continuation *cont, sockaddr const *addr, const char *hostname, ProxyClientTransaction *ua_session,
HttpSM *sm);
HSMresult_t release_session(HttpServerSession *to_release);
void purge_keepalives();
diff --git a/proxy/http/HttpTransact.cc b/proxy/http/HttpTransact.cc
index 4a068a7..27cdb8f 100644
--- a/proxy/http/HttpTransact.cc
+++ b/proxy/http/HttpTransact.cc
@@ -43,7 +43,7 @@
#include "ReverseProxy.h"
#include "HttpBodyFactory.h"
#include "StatPages.h"
-#include "HttpClientSession.h"
+#include "../IPAllow.h"
#include "I_Machine.h"
static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
@@ -916,7 +916,7 @@ HttpTransact::EndRemapRequest(State *s)
}
}
s->reverse_proxy = true;
- s->server_info.is_transparent = s->state_machine->ua_session ? s->state_machine->ua_session->f_outbound_transparent : false;
+ s->server_info.is_transparent = s->state_machine->ua_session ? s->state_machine->ua_session->is_outbound_transparent() : false;
done:
if (is_debug_tag_set("http_chdr_describe") || is_debug_tag_set("http_trans") || is_debug_tag_set("url_rewrite")) {
@@ -3778,7 +3778,7 @@ HttpTransact::handle_response_from_server(State *s)
// Force host resolution to have the same family as the client.
// Because this is a transparent connection, we can't switch address
// families - that is locked in by the client source address.
- s->state_machine->ua_session->host_res_style = ats_host_res_match(&s->current.server->dst_addr.sa);
+ s->state_machine->ua_session->set_host_res_style(ats_host_res_match(&s->current.server->dst_addr.sa));
TRANSACT_RETURN(SM_ACTION_DNS_LOOKUP, OSDNSLookup);
} else if ((s->dns_info.srv_lookup_success || s->host_db_info.is_rr_elt()) &&
(s->txn_conf->connect_attempts_rr_retries > 0) &&
@@ -5671,8 +5671,13 @@ HttpTransact::initialize_state_variables_from_request(State *s, HTTPHdr *obsolet
s->request_data.incoming_port = s->state_machine->ua_session->get_netvc()->get_local_port();
s->request_data.internal_txn = s->state_machine->ua_session->get_netvc()->get_is_internal_request();
}
+ NetVConnection *vc = NULL;
+ if (s->state_machine->ua_session) {
+ vc = s->state_machine->ua_session->get_netvc();
+ }
// If this is an internal request, never keep alive
- if (!s->txn_conf->keep_alive_enabled_in || s->request_data.internal_txn) {
+ if (!s->txn_conf->keep_alive_enabled_in || (vc && vc->get_is_internal_request()) ||
+ (s->state_machine->ua_session && s->state_machine->ua_session->ignore_keep_alive())) {
s->client_info.keep_alive = HTTP_NO_KEEPALIVE;
} else {
s->client_info.keep_alive = incoming_request->keep_alive_get();
@@ -5752,6 +5757,12 @@ HttpTransact::initialize_state_variables_from_request(State *s, HTTPHdr *obsolet
s->request_data.hostname_str = s->arena.str_store(host_name, host_len);
ats_ip_copy(&s->request_data.src_ip, &s->client_info.src_addr);
memset(&s->request_data.dest_ip, 0, sizeof(s->request_data.dest_ip));
+ if (s->state_machine->ua_session) {
+ NetVConnection *netvc = s->state_machine->ua_session->get_netvc();
+ if (netvc) {
+ s->request_data.incoming_port = netvc->get_local_port();
+ }
+ }
s->request_data.xact_start = s->client_request_time;
s->request_data.api_info = &s->api_info;
@@ -5789,7 +5800,7 @@ HttpTransact::initialize_state_variables_from_response(State *s, HTTPHdr *incomi
if (!s->cop_test_page)
DebugTxn("http_hdrs", "[initialize_state_variables_from_response]"
"Server is keep-alive.");
- } else if (s->state_machine->ua_session && s->state_machine->ua_session->f_outbound_transparent &&
+ } else if (s->state_machine->ua_session && s->state_machine->ua_session->is_outbound_transparent() &&
s->state_machine->t_state.http_config_param->use_client_source_port) {
/* If we are reusing the client<->ATS 4-tuple for ATS<->server then if the server side is closed, we can't
re-open it because the 4-tuple may still be in the processing of shutting down. So if the server isn't
@@ -6604,7 +6615,7 @@ HttpTransact::process_quick_http_filter(State *s, int method)
}
if (s->state_machine->ua_session) {
- const AclRecord *acl_record = s->state_machine->ua_session->acl_record;
+ const AclRecord *acl_record = s->state_machine->ua_session->get_acl_record();
bool deny_request = (acl_record == NULL);
if (acl_record && (acl_record->_method_mask != AclRecord::ALL_METHOD_MASK)) {
if (method != -1) {
@@ -8136,7 +8147,7 @@ HttpTransact::build_error_response(State *s, HTTPStatus status_code, const char
}
// If transparent and the forward server connection looks unhappy don't
// keep alive the ua connection.
- if ((s->state_machine->ua_session && s->state_machine->ua_session->f_outbound_transparent) &&
+ if ((s->state_machine->ua_session && s->state_machine->ua_session->is_outbound_transparent()) &&
(status_code == HTTP_STATUS_INTERNAL_SERVER_ERROR || status_code == HTTP_STATUS_GATEWAY_TIMEOUT ||
status_code == HTTP_STATUS_BAD_GATEWAY || status_code == HTTP_STATUS_SERVICE_UNAVAILABLE)) {
s->client_info.keep_alive = HTTP_NO_KEEPALIVE;
diff --git a/proxy/http/HttpTunnel.cc b/proxy/http/HttpTunnel.cc
index 961e225..51e2adf 100644
--- a/proxy/http/HttpTunnel.cc
+++ b/proxy/http/HttpTunnel.cc
@@ -71,9 +71,10 @@ VcTypeCode(HttpTunnelType_t t)
}
ChunkedHandler::ChunkedHandler()
- : chunked_reader(NULL), dechunked_buffer(NULL), dechunked_size(0), dechunked_reader(NULL), chunked_buffer(NULL), chunked_size(0),
- truncation(false), skip_bytes(0), state(CHUNK_READ_CHUNK), cur_chunk_size(0), bytes_left(0), last_server_event(VC_EVENT_NONE),
- running_sum(0), num_digits(0), max_chunk_size(DEFAULT_MAX_CHUNK_SIZE), max_chunk_header_len(0)
+ : action(ACTION_UNSET), chunked_reader(NULL), dechunked_buffer(NULL), dechunked_size(0), dechunked_reader(NULL),
+ chunked_buffer(NULL), chunked_size(0), truncation(false), skip_bytes(0), state(CHUNK_READ_CHUNK), cur_chunk_size(0),
+ bytes_left(0), last_server_event(VC_EVENT_NONE), running_sum(0), num_digits(0), max_chunk_size(DEFAULT_MAX_CHUNK_SIZE),
+ max_chunk_header_len(0)
{
}
@@ -476,7 +477,9 @@ HttpTunnelConsumer::HttpTunnelConsumer()
{
}
-HttpTunnel::HttpTunnel() : Continuation(NULL), num_producers(0), num_consumers(0), sm(NULL), active(false), postbuf(NULL)
+HttpTunnel::HttpTunnel()
+ : Continuation(NULL), num_producers(0), num_consumers(0), sm(NULL), active(false), postbuf(NULL), reentrancy_count(0),
+ call_sm(false)
{
}
@@ -487,6 +490,7 @@ HttpTunnel::init(HttpSM *sm_arg, ProxyMutex *amutex)
sm = sm_arg;
active = false;
mutex = amutex;
+ ink_release_assert(reentrancy_count == 0);
SET_HANDLER(&HttpTunnel::main_handler);
flow_state.enabled_p = params->oride.flow_control_enabled;
if (params->oride.flow_low_water_mark > 0)
@@ -905,7 +909,7 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
// the amount to read since we know it. We will forward the FIN
// to the server on VC_EVENT_WRITE_COMPLETE.
if (p->vc_type == HT_HTTP_CLIENT) {
- HttpClientSession *ua_vc = static_cast<HttpClientSession *>(p->vc);
+ ProxyClientTransaction *ua_vc = static_cast<ProxyClientTransaction *>(p->vc);
if (ua_vc->get_half_close_flag()) {
c_write = c->buffer_reader->read_avail();
p->alive = false;
@@ -947,7 +951,7 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
producer_handler(VC_EVENT_READ_READY, p);
} else if (p->do_dechunking || p->do_chunked_passthru) {
// remove the dechunked reader marker so that it doesn't act like a buffer guard
- if (p->do_dechunking)
+ if (p->do_dechunking && dechunked_buffer_start)
p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
// bz57413
@@ -998,7 +1002,8 @@ HttpTunnel::producer_run(HttpTunnelProducer *p)
// Now that the tunnel has started, we must remove producer's reader so
// that it doesn't act like a buffer guard
- p->read_buffer->dealloc_reader(p->buffer_start);
+ if (p->read_buffer && p->buffer_start)
+ p->read_buffer->dealloc_reader(p->buffer_start);
p->buffer_start = NULL;
}
@@ -1151,25 +1156,16 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
case VC_EVENT_READ_READY:
// Data read from producer, reenable consumers
for (c = p->consumer_list.head; c; c = c->link.next) {
- if (c->alive) {
+ if (c->alive && c->write_vio) {
c->write_vio->reenable();
}
}
break;
case HTTP_TUNNEL_EVENT_PRECOMPLETE:
- // the producer had finished before the tunnel
- // started so just call the state machine back
- // We don't need to reenable since the consumers
- // were just activated. Likewise, we can't be
- // done because the consumer couldn't have
- // called us back yet
- p->bytes_read = 0;
- jump_point = p->vc_handler;
- (sm->*jump_point)(event, p);
- sm_callback = true;
- p->update_state_if_not_set(HTTP_SM_POST_SUCCESS);
- break;
+ // If the write completes on the stack (as it can for http2), then
+ // consumer could have called back by this point. Must treat this as
+ // a regular read complete (falling through to the following cases).
case VC_EVENT_READ_COMPLETE:
case VC_EVENT_EOS:
@@ -1200,7 +1196,7 @@ HttpTunnel::producer_handler(int event, HttpTunnelProducer *p)
// Data read from producer, reenable consumers
for (c = p->consumer_list.head; c; c = c->link.next) {
- if (c->alive) {
+ if (c->alive && c->write_vio) {
c->write_vio->reenable();
}
}
@@ -1410,7 +1406,8 @@ HttpTunnel::chain_abort_all(HttpTunnelProducer *p)
if (p->alive) {
p->alive = false;
- p->bytes_read = p->read_vio->ndone;
+ if (p->read_vio)
+ p->bytes_read = p->read_vio->ndone;
if (p->self_consumer) {
p->self_consumer->alive = false;
}
@@ -1456,13 +1453,15 @@ HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
} else
total_bytes = p->bytes_read + p->init_bytes_done;
- c->write_vio->nbytes = total_bytes - c->skip_bytes;
- ink_assert(c->write_vio->nbytes >= 0);
+ if (c->write_vio) {
+ c->write_vio->nbytes = total_bytes - c->skip_bytes;
+ ink_assert(c->write_vio->nbytes >= 0);
- if (c->write_vio->nbytes < 0) {
- // TODO: Wtf, printf?
- fprintf(stderr, "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
- (int64_t)(total_bytes - c->skip_bytes));
+ if (c->write_vio->nbytes < 0) {
+ // TODO: Wtf, printf?
+ fprintf(stderr, "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
+ (int64_t)(total_bytes - c->skip_bytes));
+ }
}
if (chain == true && c->self_producer) {
@@ -1472,7 +1471,7 @@ HttpTunnel::finish_all_internal(HttpTunnelProducer *p, bool chain)
// is nothing to do. Check to see if there is
// nothing to do and take the appripriate
// action
- if (c->write_vio->nbytes == c->write_vio->ndone) {
+ if (c->write_vio && c->write_vio->nbytes == c->write_vio->ndone) {
consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
}
}
@@ -1562,6 +1561,8 @@ HttpTunnel::main_handler(int event, void *data)
HttpTunnelConsumer *c = NULL;
bool sm_callback = false;
+ ++reentrancy_count;
+
ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
// Find the appropriate entry
@@ -1580,11 +1581,18 @@ HttpTunnel::main_handler(int event, void *data)
// finished. Check to see if there are any remaining
// VConnections alive. If not, notifiy the state machine
//
- if (sm_callback && !is_tunnel_alive()) {
- active = false;
- sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
- return EVENT_DONE;
+ // Don't call out if we are nested
+ if (call_sm || (sm_callback && !is_tunnel_alive())) {
+ if (reentrancy_count == 1) {
+ reentrancy_count = 0;
+ active = false;
+ sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
+ return EVENT_DONE;
+ } else {
+ call_sm = true;
+ }
}
+ --reentrancy_count;
return EVENT_CONT;
}
diff --git a/proxy/http/HttpTunnel.h b/proxy/http/HttpTunnel.h
index 2534665..a075dd6 100644
--- a/proxy/http/HttpTunnel.h
+++ b/proxy/http/HttpTunnel.h
@@ -100,11 +100,7 @@ struct ChunkedHandler {
static int const DEFAULT_MAX_CHUNK_SIZE = 4096;
- enum Action {
- ACTION_DOCHUNK = 0,
- ACTION_DECHUNK,
- ACTION_PASSTHRU,
- };
+ enum Action { ACTION_DOCHUNK = 0, ACTION_DECHUNK, ACTION_PASSTHRU, ACTION_UNSET };
Action action;
@@ -386,6 +382,10 @@ private:
public:
PostDataBuffers *postbuf;
+
+private:
+ int reentrancy_count;
+ bool call_sm;
};
// void HttpTunnel::abort_cache_write_finish_others
@@ -482,9 +482,11 @@ HttpTunnel::get_producer(VIO *vio)
inline HttpTunnelConsumer *
HttpTunnel::get_consumer(VIO *vio)
{
- for (int i = 0; i < MAX_CONSUMERS; i++) {
- if (consumers[i].write_vio == vio) {
- return consumers + i;
+ if (vio) {
+ for (int i = 0; i < MAX_CONSUMERS; i++) {
+ if (consumers[i].write_vio == vio || consumers[i].vc == vio->vc_server) {
+ return consumers + i;
+ }
}
}
return NULL;
diff --git a/proxy/http/Makefile.am b/proxy/http/Makefile.am
index 039f78e..499eded 100644
--- a/proxy/http/Makefile.am
+++ b/proxy/http/Makefile.am
@@ -43,8 +43,10 @@ libhttp_a_SOURCES = \
HttpBodyFactory.h \
HttpCacheSM.cc \
HttpCacheSM.h \
- HttpClientSession.cc \
- HttpClientSession.h \
+ Http1ClientSession.cc \
+ Http1ClientSession.h \
+ Http1ClientTransaction.cc \
+ Http1ClientTransaction.h \
HttpConfig.cc \
HttpConfig.h \
HttpConnectionCount.cc \
diff --git a/proxy/http2/HTTP2.cc b/proxy/http2/HTTP2.cc
index 3557cec..92b2742 100644
--- a/proxy/http2/HTTP2.cc
+++ b/proxy/http2/HTTP2.cc
@@ -54,6 +54,12 @@ static char const *const HTTP2_STAT_TOTAL_TRANSACTIONS_TIME_NAME = "proxy.proces
static char const *const HTTP2_STAT_TOTAL_CLIENT_CONNECTION_NAME = "proxy.process.http2.total_client_connections";
static char const *const HTTP2_STAT_CONNECTION_ERRORS_NAME = "proxy.process.http2.connection_errors";
static char const *const HTTP2_STAT_STREAM_ERRORS_NAME = "proxy.process.http2.stream_errors";
+static char const *const HTTP2_STAT_SESSION_DIE_DEFAULT_NAME = "proxy.process.http2.session_die_default";
+static char const *const HTTP2_STAT_SESSION_DIE_OTHER_NAME = "proxy.process.http2.session_die_other";
+static char const *const HTTP2_STAT_SESSION_DIE_ACTIVE_NAME = "proxy.process.http2.session_die_active";
+static char const *const HTTP2_STAT_SESSION_DIE_INACTIVE_NAME = "proxy.process.http2.session_die_inactive";
+static char const *const HTTP2_STAT_SESSION_DIE_EOS_NAME = "proxy.process.http2.session_die_eos";
+static char const *const HTTP2_STAT_SESSION_DIE_ERROR_NAME = "proxy.process.http2.session_die_error";
union byte_pointer {
byte_pointer(void *p) : ptr(p) {}
@@ -710,6 +716,18 @@ Http2::init()
static_cast<int>(HTTP2_STAT_CONNECTION_ERRORS_COUNT), RecRawStatSyncSum);
RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_STREAM_ERRORS_NAME, RECD_INT, RECP_PERSISTENT,
static_cast<int>(HTTP2_STAT_STREAM_ERRORS_COUNT), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_DEFAULT_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_DEFAULT), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_OTHER_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_OTHER), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_EOS_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_EOS), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_ACTIVE_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_ACTIVE), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_INACTIVE_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_INACTIVE), RecRawStatSyncSum);
+ RecRegisterRawStat(http2_rsb, RECT_PROCESS, HTTP2_STAT_SESSION_DIE_ERROR_NAME, RECD_INT, RECP_PERSISTENT,
+ static_cast<int>(HTTP2_STAT_SESSION_DIE_ERROR), RecRawStatSyncSum);
}
#if TS_HAS_TESTS
diff --git a/proxy/http2/HTTP2.h b/proxy/http2/HTTP2.h
index 7448ed6..537f555 100644
--- a/proxy/http2/HTTP2.h
+++ b/proxy/http2/HTTP2.h
@@ -69,6 +69,12 @@ enum {
HTTP2_STAT_TOTAL_CLIENT_CONNECTION_COUNT, // Total connections running http2
HTTP2_STAT_STREAM_ERRORS_COUNT,
HTTP2_STAT_CONNECTION_ERRORS_COUNT,
+ HTTP2_STAT_SESSION_DIE_DEFAULT,
+ HTTP2_STAT_SESSION_DIE_OTHER,
+ HTTP2_STAT_SESSION_DIE_ACTIVE,
+ HTTP2_STAT_SESSION_DIE_INACTIVE,
+ HTTP2_STAT_SESSION_DIE_EOS,
+ HTTP2_STAT_SESSION_DIE_ERROR,
HTTP2_N_STATS // Terminal counter, NOT A STAT INDEX.
};
diff --git a/proxy/http2/Http2ClientSession.cc b/proxy/http2/Http2ClientSession.cc
index 44f3249..8b8427e 100644
--- a/proxy/http2/Http2ClientSession.cc
+++ b/proxy/http2/Http2ClientSession.cc
@@ -24,6 +24,7 @@
#include "Http2ClientSession.h"
#include "HttpDebugNames.h"
#include "ts/ink_base64.h"
+#include "../IPAllow.h"
#define STATE_ENTER(state_name, event) \
do { \
@@ -69,6 +70,30 @@ Http2ClientSession::destroy()
{
DebugHttp2Ssn("session destroy");
+ // Update stats on how we died. May want to eliminate this. Was useful for
+ // tracking down which cases we were having problems cleaning up. But for general
+ // use probably not worth the effort
+ switch (dying_event) {
+ case VC_EVENT_NONE:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_DEFAULT, this_ethread());
+ break;
+ case VC_EVENT_ACTIVE_TIMEOUT:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_ACTIVE, this_ethread());
+ break;
+ case VC_EVENT_INACTIVITY_TIMEOUT:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_INACTIVE, this_ethread());
+ break;
+ case VC_EVENT_ERROR:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_ERROR, this_ethread());
+ break;
+ case VC_EVENT_EOS:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_EOS, this_ethread());
+ break;
+ default:
+ HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_SESSION_DIE_OTHER, this_ethread());
+ break;
+ }
+
ink_release_assert(this->client_vc == NULL);
this->connection_state.destroy();
@@ -77,7 +102,7 @@ Http2ClientSession::destroy()
free_MIOBuffer(this->read_buffer);
free_MIOBuffer(this->write_buffer);
- http2ClientSessionAllocator.free(this);
+ THREAD_FREE(this, http2ClientSessionAllocator, this_ethread());
}
void
@@ -107,6 +132,17 @@ Http2ClientSession::start()
void
Http2ClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor)
{
+ acl_record = NULL;
+ sockaddr const *client_ip = new_vc->get_remote_addr();
+ IpAllow::scoped_config ipallow;
+ if (ipallow && (((acl_record = ipallow->match(client_ip)) == NULL) || (acl_record->isEmpty()))) {
+ ip_port_text_buffer ipb;
+ Warning("http2 client '%s' prohibited by ip-allow policy", ats_ip_ntop(client_ip, ipb, sizeof(ipb)));
+ } else if (!acl_record) {
+ ip_port_text_buffer ipb;
+ Warning("http2 client '%s' no ip-allow policy specified", ats_ip_ntop(client_ip, ipb, sizeof(ipb)));
+ }
+
ink_assert(new_vc->mutex->thread_holding == this_ethread());
HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_SESSION_COUNT, new_vc->mutex->thread_holding);
HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_CLIENT_CONNECTION_COUNT, new_vc->mutex->thread_holding);
@@ -120,6 +156,10 @@ Http2ClientSession::new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOB
client_vc->set_inactivity_timeout(HRTIME_SECONDS(Http2::accept_no_activity_timeout));
this->mutex = new_vc->mutex;
+ // These macros must have the mutex set.
+ HTTP_INCREMENT_DYN_STAT(http_current_client_connections_stat);
+ HTTP_INCREMENT_DYN_STAT(http_total_client_connections_stat);
+
this->connection_state.mutex = new_ProxyMutex();
DebugHttp2Ssn("session born, netvc %p", this->client_vc);
@@ -201,14 +241,8 @@ Http2ClientSession::do_io_close(int alerrno)
DebugHttp2Ssn("session closed");
ink_assert(this->mutex->thread_holding == this_ethread());
- if (client_vc) {
- // clean up ssl's first byte iobuf
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(client_vc);
- if (ssl_vc) {
- ssl_vc->set_ssl_iobuf(NULL);
- }
- }
HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_SESSION_COUNT, this->mutex->thread_holding);
+ HTTP_DECREMENT_DYN_STAT(http_current_client_connections_stat);
send_connection_event(&this->connection_state, HTTP2_SESSION_EVENT_FINI, this);
do_api_callout(TS_HTTP_SSN_CLOSE_HOOK);
}
diff --git a/proxy/http2/Http2ClientSession.h b/proxy/http2/Http2ClientSession.h
index d0608d5..672ca76 100644
--- a/proxy/http2/Http2ClientSession.h
+++ b/proxy/http2/Http2ClientSession.h
@@ -40,7 +40,7 @@
#define HTTP2_SESSION_EVENT_RECV (HTTP2_SESSION_EVENTS_START + 3)
#define HTTP2_SESSION_EVENT_XMIT (HTTP2_SESSION_EVENTS_START + 4)
-static size_t const HTTP2_HEADER_BUFFER_SIZE_INDEX = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
+size_t const HTTP2_HEADER_BUFFER_SIZE_INDEX = CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX;
// To support Upgrade: h2c
struct Http2UpgradeContext {
@@ -184,12 +184,6 @@ public:
client_vc = NULL;
}
- int64_t
- connection_id() const
- {
- return this->con_id;
- }
-
sockaddr const *
get_client_addr()
{
@@ -212,6 +206,28 @@ public:
virtual char const *getPluginTag() const;
virtual int64_t getPluginId() const;
+ virtual int
+ get_transact_count() const
+ {
+ return (int)con_id;
+ }
+ virtual void
+ release(ProxyClientTransaction *trans)
+ {
+ }
+
+ Http2ConnectionState connection_state;
+ void
+ set_dying_event(int event)
+ {
+ dying_event = event;
+ }
+ int
+ get_dying_event() const
+ {
+ return dying_event;
+ }
+
private:
Http2ClientSession(Http2ClientSession &); // noncopyable
Http2ClientSession &operator=(const Http2ClientSession &); // noncopyable
@@ -231,12 +247,12 @@ private:
MIOBuffer *write_buffer;
IOBufferReader *sm_writer;
Http2FrameHeader current_hdr;
- Http2ConnectionState connection_state;
// For Upgrade: h2c
Http2UpgradeContext upgrade_context;
VIO *write_vio;
+ int dying_event;
};
extern ClassAllocator<Http2ClientSession> http2ClientSessionAllocator;
diff --git a/proxy/http2/Http2ConnectionState.cc b/proxy/http2/Http2ConnectionState.cc
index 1059d4f..4da397b 100644
--- a/proxy/http2/Http2ConnectionState.cc
+++ b/proxy/http2/Http2ConnectionState.cc
@@ -90,11 +90,6 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
}
}
- // Check to see if FetchSM is NULL
- if (stream->get_fetcher() == NULL) {
- return Http2Error(HTTP2_ERROR_CLASS_STREAM, HTTP2_ERROR_STREAM_CLOSED);
- }
-
// If a DATA frame is received whose stream is not in "open" or "half closed
// (local)" state,
// the recipient MUST respond with a stream error of type STREAM_CLOSED.
@@ -142,13 +137,20 @@ rcv_data_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
stream->server_rwnd -= payload_length;
const uint32_t unpadded_length = payload_length - pad_length;
+ // If we call write() multiple times, we must keep the same reader, so we can
+ // update its offset via consume. Otherwise, we will read the same data on the
+ // second time through
+ IOBufferReader *myreader = frame.reader()->clone();
while (nbytes < payload_length - pad_length) {
size_t read_len = sizeof(buf);
if (nbytes + read_len > unpadded_length)
read_len -= nbytes + read_len - unpadded_length;
- unsigned read_bytes = read_rcv_buffer(buf, read_len, nbytes, frame);
- stream->set_body_to_fetcher(buf, read_bytes);
+ nbytes += stream->request_buffer.write(myreader, read_len);
+ myreader->consume(nbytes);
+ // If there is an outstanding read, update the buffer
+ stream->update_read_request(INT64_MAX, true);
}
+ myreader->writer()->dealloc_reader(myreader);
uint32_t initial_rwnd = cstate.server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
uint32_t min_rwnd = min(initial_rwnd, cstate.server_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE));
@@ -265,7 +267,7 @@ rcv_headers_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
return Http2Error(HTTP2_ERROR_CLASS_CONNECTION, HTTP2_ERROR_PROTOCOL_ERROR);
}
- bool skip_fetcher = false;
+ bool empty_request = false;
if (stream->has_trailing_header()) {
if (!(frame.header().flags & HTTP2_FLAGS_HEADERS_END_STREAM)) {
return Http2Error(HTTP2_ERROR_CLASS_STREAM, HTTP2_ERROR_PROTOCOL_ERROR);
@@ -274,7 +276,7 @@ rcv_headers_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
// Set a flag to avoid initializing fetcher for now.
// Decoding header blocks is stil needed to maintain a HPACK dynamic table.
// TODO: TS-3812
- skip_fetcher = true;
+ empty_request = true;
}
Http2ErrorCode result = stream->decode_header_blocks(*cstate.local_hpack_handle);
@@ -287,10 +289,11 @@ rcv_headers_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
}
}
- if (!skip_fetcher) {
- if (!stream->init_fetcher(cstate)) {
- return Http2Error(HTTP2_ERROR_CLASS_STREAM, HTTP2_ERROR_PROTOCOL_ERROR);
- }
+ // Set up the State Machine
+ if (!empty_request) {
+ stream->new_transaction();
+ // Send request header to SM
+ stream->send_request(cstate);
}
} else {
// NOTE: Expect CONTINUATION Frame. Do NOT change state of stream or decode
@@ -610,7 +613,7 @@ rcv_window_update_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
stream->client_rwnd += size;
ssize_t wnd = min(cstate.client_rwnd, stream->client_rwnd);
if (stream->get_state() == HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE && wnd > 0) {
- cstate.send_data_frame(stream->get_fetcher());
+ cstate.send_data_frame(stream);
}
}
@@ -690,7 +693,10 @@ rcv_continuation_frame(Http2ConnectionState &cstate, const Http2Frame &frame)
}
}
- stream->init_fetcher(cstate);
+ // Set up the State Machine
+ stream->new_transaction();
+ // Send request header to SM
+ stream->send_request(cstate);
} else {
// NOTE: Expect another CONTINUATION Frame. Do nothing.
DebugHttp2Stream(cstate.ua_session, stream_id, "No END_HEADERS flag, expecting CONTINUATION frame");
@@ -772,7 +778,9 @@ Http2ConnectionState::main_event_handler(int event, void *edata)
if (error.cls != HTTP2_ERROR_CLASS_NONE) {
if (error.cls == HTTP2_ERROR_CLASS_CONNECTION) {
this->send_goaway_frame(stream_id, error.code);
- cleanup_streams();
+ // The streams will be cleaned up by the HTTP2_SESSION_EVENT_FINI event
+ // The Http2ClientSession will shutdown because connection_state.is_state_closed() will be true
+
// XXX We need to think a bit harder about how to coordinate the client
// session and the
// protocol connection. At this point, the protocol is shutting down,
@@ -788,30 +796,6 @@ Http2ConnectionState::main_event_handler(int event, void *edata)
return 0;
}
-
- // Process response headers from origin server
- case TS_FETCH_EVENT_EXT_HEAD_DONE: {
- FetchSM *fetch_sm = reinterpret_cast<FetchSM *>(edata);
- this->send_headers_frame(fetch_sm);
- return 0;
- }
-
- // Process a part of response body from origin server
- case TS_FETCH_EVENT_EXT_BODY_READY: {
- FetchSM *fetch_sm = reinterpret_cast<FetchSM *>(edata);
- this->send_data_frame(fetch_sm);
- return 0;
- }
-
- // Process final part of response body from origin server
- case TS_FETCH_EVENT_EXT_BODY_DONE: {
- FetchSM *fetch_sm = reinterpret_cast<FetchSM *>(edata);
- Http2Stream *stream = static_cast<Http2Stream *>(fetch_sm->ext_get_user_data());
- stream->mark_body_done();
- this->send_data_frame(fetch_sm);
- return 0;
- }
-
default:
DebugHttp2Con(ua_session, "unexpected event=%d edata=%p", event, edata);
ink_release_assert(0);
@@ -844,12 +828,15 @@ Http2ConnectionState::create_stream(Http2StreamId new_id)
return NULL;
}
- Http2Stream *new_stream = new Http2Stream(new_id, client_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE));
+ Http2Stream *new_stream = THREAD_ALLOC_INIT(http2StreamAllocator, this_ethread());
+ new_stream->init(new_id, client_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE));
stream_list.push(new_stream);
latest_streamid = new_id;
ink_assert(client_streams_count < UINT32_MAX);
++client_streams_count;
+ new_stream->set_parent(ua_session);
+ new_stream->mutex = ua_session->mutex;
ua_session->get_netvc()->add_to_active_queue();
return new_stream;
@@ -874,7 +861,7 @@ Http2ConnectionState::restart_streams()
while (s) {
Http2Stream *next = s->link.next;
if (s->get_state() == HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE && min(this->client_rwnd, s->client_rwnd) > 0) {
- this->send_data_frame(s->get_fetcher());
+ this->send_data_frame(s);
}
s = next;
}
@@ -886,8 +873,7 @@ Http2ConnectionState::cleanup_streams()
Http2Stream *s = stream_list.head;
while (s) {
Http2Stream *next = s->link.next;
- stream_list.remove(s);
- delete s;
+ this->delete_stream(s);
s = next;
}
client_streams_count = 0;
@@ -900,12 +886,12 @@ void
Http2ConnectionState::delete_stream(Http2Stream *stream)
{
stream_list.remove(stream);
- delete stream;
+ stream->initiating_close();
ink_assert(client_streams_count > 0);
--client_streams_count;
- if (client_streams_count == 0) {
+ if (client_streams_count == 0 && ua_session) {
ua_session->get_netvc()->add_to_keep_alive_queue();
}
}
@@ -920,17 +906,11 @@ Http2ConnectionState::update_initial_rwnd(Http2WindowSize new_size)
}
void
-Http2ConnectionState::send_data_frame(FetchSM *fetch_sm)
+Http2ConnectionState::send_data_frame(Http2Stream *stream)
{
- if (fetch_sm == NULL) {
- return;
- }
-
size_t buf_len = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA]) - HTTP2_FRAME_HEADER_LEN;
uint8_t payload_buffer[buf_len];
- Http2Stream *stream = static_cast<Http2Stream *>(fetch_sm->ext_get_user_data());
-
if (stream->get_state() == HTTP2_STREAM_STATE_CLOSED) {
return;
}
@@ -938,27 +918,38 @@ Http2ConnectionState::send_data_frame(FetchSM *fetch_sm)
for (;;) {
uint8_t flags = 0x00;
- // Select appropriate payload size
- if (this->client_rwnd <= 0 || stream->client_rwnd <= 0)
- break;
size_t window_size = min(this->client_rwnd, stream->client_rwnd);
size_t send_size = min(buf_len, window_size);
+ size_t payload_length;
+ IOBufferReader *current_reader = stream->response_get_data_reader();
- size_t payload_length = fetch_sm->ext_read_data(reinterpret_cast<char *>(payload_buffer), send_size);
-
+ // Are we at the end?
// If we break here, we never send the END_STREAM in the case of a
// early terminating OS. Ok if there is no body yet. Otherwise
// continue on to delete the stream
- if (payload_length == 0 && !stream->is_body_done()) {
- break;
- }
+ if (stream->is_body_done() && current_reader && !current_reader->is_read_avail_more_than(0)) {
+ Debug("http2_con", "End of Stream id=%d no more data and body done", stream->get_id());
+ flags |= HTTP2_FLAGS_DATA_END_STREAM;
+ payload_length = 0;
+ } else {
+ // Select appropriate payload size
+ if (this->client_rwnd <= 0 || stream->client_rwnd <= 0)
+ break;
+ // Copy into the payload buffer. Seems like we should be able to skip this
+ // copy step
+ payload_length = current_reader ? current_reader->read(payload_buffer, send_size) : 0;
+
+ if (payload_length == 0 && !stream->is_body_done()) {
+ break;
+ }
- // Update window size
- this->client_rwnd -= payload_length;
- stream->client_rwnd -= payload_length;
+ // Update window size
+ this->client_rwnd -= payload_length;
+ stream->client_rwnd -= payload_length;
- if (stream->is_body_done() && payload_length < send_size) {
- flags |= HTTP2_FLAGS_DATA_END_STREAM;
+ if (stream->is_body_done() && payload_length < send_size) {
+ flags |= HTTP2_FLAGS_DATA_END_STREAM;
+ }
}
// Create frame
@@ -969,12 +960,14 @@ Http2ConnectionState::send_data_frame(FetchSM *fetch_sm)
http2_write_data(payload_buffer, payload_length, data.write());
data.finalize(payload_length);
+ stream->update_sent_count(payload_length);
+
+
// Change state to 'closed' if its end of DATAs.
if (flags & HTTP2_FLAGS_DATA_END_STREAM) {
DebugHttp2Stream(ua_session, stream->get_id(), "End of DATA frame");
- if (!stream->change_state(data.header().type, data.header().flags)) {
- this->send_goaway_frame(stream->get_id(), HTTP2_ERROR_PROTOCOL_ERROR);
- }
+ // Setting to the same state shouldn't be erroneous
+ stream->change_state(data.header().type, data.header().flags);
}
// xmit event
@@ -986,6 +979,7 @@ Http2ConnectionState::send_data_frame(FetchSM *fetch_sm)
// TODO its should not be deleted for a several time to handling
// RST_STREAM and WINDOW_UPDATE.
// See 'closed' state written at [RFC 7540] 5.1.
+ DebugSsn(this->ua_session, "http2_cs", "Shutdown stream %d", stream->get_id());
this->delete_stream(stream);
break;
}
@@ -993,7 +987,7 @@ Http2ConnectionState::send_data_frame(FetchSM *fetch_sm)
}
void
-Http2ConnectionState::send_headers_frame(FetchSM *fetch_sm)
+Http2ConnectionState::send_headers_frame(Http2Stream *stream)
{
uint8_t *buf = NULL;
uint32_t buf_len = 0;
@@ -1002,14 +996,14 @@ Http2ConnectionState::send_headers_frame(FetchSM *fetch_sm)
uint64_t sent = 0;
uint8_t flags = 0x00;
- Http2Stream *stream = static_cast<Http2Stream *>(fetch_sm->ext_get_user_data());
- HTTPHdr *resp_header = reinterpret_cast<HTTPHdr *>(fetch_sm->resp_hdr_bufp());
+ HTTPHdr *resp_header = &stream->response_header;
DebugHttp2Stream(ua_session, stream->get_id(), "Send HEADERS frame");
HTTPHdr h2_hdr;
http2_generate_h2_header_from_1_1(resp_header, &h2_hdr);
- buf_len = h2_hdr.length_get() * 2; // Make it double just in case
+
+ buf_len = resp_header->length_get() * 2; // Make it double just in case
buf = (uint8_t *)ats_malloc(buf_len);
if (buf == NULL) {
h2_hdr.destroy();
@@ -1131,6 +1125,7 @@ Http2ConnectionState::send_settings_frame(const Http2ConnectionSettings &new_set
}
settings.finalize(settings_length);
+ SCOPED_MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread());
this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &settings);
}
diff --git a/proxy/http2/Http2ConnectionState.h b/proxy/http2/Http2ConnectionState.h
index a80fc9d..9d6502c 100644
--- a/proxy/http2/Http2ConnectionState.h
+++ b/proxy/http2/Http2ConnectionState.h
@@ -26,7 +26,6 @@
#include "HTTP2.h"
#include "HPACK.h"
-#include "FetchSM.h"
#include "Http2Stream.h"
class Http2ClientSession;
@@ -181,8 +180,8 @@ public:
ssize_t client_rwnd, server_rwnd;
// HTTP/2 frame sender
- void send_data_frame(FetchSM *fetch_sm);
- void send_headers_frame(FetchSM *fetch_sm);
+ void send_data_frame(Http2Stream *stream);
+ void send_headers_frame(Http2Stream *stream);
void send_rst_stream_frame(Http2StreamId id, Http2ErrorCode ec);
void send_settings_frame(const Http2ConnectionSettings &new_settings);
void send_ping_frame(Http2StreamId id, uint8_t flag, const uint8_t *opaque_data);
diff --git a/proxy/http2/Http2SessionAccept.cc b/proxy/http2/Http2SessionAccept.cc
index ca3139f..c32c9d9 100644
--- a/proxy/http2/Http2SessionAccept.cc
+++ b/proxy/http2/Http2SessionAccept.cc
@@ -44,8 +44,8 @@ Http2SessionAccept::accept(NetVConnection *netvc, MIOBuffer *iobuf, IOBufferRead
netvc->attributes = this->options.transport_type;
+ const sockaddr *client_ip = netvc->get_remote_addr();
if (is_debug_tag_set("http2_seq")) {
- const sockaddr *client_ip = netvc->get_remote_addr();
ip_port_text_buffer ipb;
Debug("http2_seq", "[HttpSessionAccept2:mainEvent %p] accepted connection from %s transport type = %d", netvc,
@@ -53,7 +53,7 @@ Http2SessionAccept::accept(NetVConnection *netvc, MIOBuffer *iobuf, IOBufferRead
}
// XXX Allocate a Http2ClientSession
- Http2ClientSession *new_session = http2ClientSessionAllocator.alloc();
+ Http2ClientSession *new_session = THREAD_ALLOC_INIT(http2ClientSessionAllocator, this_ethread());
new_session->new_connection(netvc, iobuf, reader, false /* backdoor */);
}
@@ -65,15 +65,7 @@ Http2SessionAccept::mainEvent(int event, void *data)
ink_release_assert((event == NET_EVENT_ACCEPT) ? (data != 0) : (1));
if (event == NET_EVENT_ACCEPT) {
- NetVConnection *netvc = static_cast<NetVConnection *>(data);
- SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
- MIOBuffer *iobuf = NULL;
- IOBufferReader *reader = NULL;
- if (ssl_vc) {
- iobuf = ssl_vc->get_ssl_iobuf();
- reader = ssl_vc->get_ssl_reader();
- }
- this->accept(static_cast<NetVConnection *>(data), iobuf, reader);
+ this->accept(static_cast<NetVConnection *>(data), NULL, NULL);
return EVENT_CONT;
}
diff --git a/proxy/http2/Http2Stream.cc b/proxy/http2/Http2Stream.cc
index b998be7..93def8b 100644
--- a/proxy/http2/Http2Stream.cc
+++ b/proxy/http2/Http2Stream.cc
@@ -1,6 +1,6 @@
/** @file
- Http2Stream
+ Http2Stream.cc
@section license License
@@ -21,57 +21,110 @@
limitations under the License.
*/
+#include "HTTP2.h"
#include "Http2Stream.h"
-#include "Http2ConnectionState.h"
#include "Http2ClientSession.h"
+#include "../http/HttpSM.h"
-// Currently use only HTTP/1.1 for requesting to origin server
-const static char *HTTP2_FETCHING_HTTP_VERSION = "HTTP/1.1";
+ClassAllocator<Http2Stream> http2StreamAllocator("http2StreamAllocator");
-bool
-Http2Stream::init_fetcher(Http2ConnectionState &cstate)
+int
+Http2Stream::main_event_handler(int event, void *edata)
{
- extern ClassAllocator<FetchSM> FetchSMAllocator;
+ Event *e = static_cast<Event *>(edata);
- // Convert header to HTTP/1.1 format
- if (http2_convert_header_from_2_to_1_1(&_req_header) == PARSE_ERROR) {
- return false;
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+ if (e == cross_thread_event) {
+ cross_thread_event = NULL;
}
- // Get null-terminated URL and method
- Arena arena;
- int url_len, method_len;
- const char *url_ref = _req_header.url_get()->string_get_ref(&url_len);
- const char *url = arena.str_store(url_ref, url_len);
- const char *method_ref = _req_header.method_get(&method_len);
- const char *method = arena.str_store(method_ref, method_len);
-
- // Initialize FetchSM
- _fetch_sm = FetchSMAllocator.alloc();
- _fetch_sm->ext_init((Continuation *)cstate.ua_session, method, url, HTTP2_FETCHING_HTTP_VERSION,
- cstate.ua_session->get_client_addr(), (TS_FETCH_FLAGS_DECHUNK | TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST));
-
- // Set request header
- MIMEFieldIter fiter;
- for (const MIMEField *field = _req_header.iter_get_first(&fiter); field != NULL; field = _req_header.iter_get_next(&fiter)) {
- int name_len, value_len;
- const char *name = field->name_get(&name_len);
- const char *value = field->value_get(&value_len);
-
- _fetch_sm->ext_add_header(name, name_len, value, value_len);
+ if (e == active_event) {
+ event = VC_EVENT_ACTIVE_TIMEOUT;
+ active_event = NULL;
+ } else if (e == inactive_event) {
+ if (inactive_timeout_at && inactive_timeout_at < Thread::get_hrtime()) {
+ event = VC_EVENT_INACTIVITY_TIMEOUT;
+ clear_inactive_timer();
+ }
}
-
- _fetch_sm->ext_set_user_data(this);
- _fetch_sm->ext_launch();
- return true;
+ switch (event) {
+ case VC_EVENT_ACTIVE_TIMEOUT:
+ case VC_EVENT_INACTIVITY_TIMEOUT:
+ if (current_reader && read_vio.ntodo() > 0) {
+ SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
+ read_vio._cont->handleEvent(event, &read_vio);
+ } else if (current_reader && write_vio.ntodo() > 0) {
+ SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
+ write_vio._cont->handleEvent(event, &write_vio);
+ }
+ break;
+ case VC_EVENT_WRITE_READY:
+ case VC_EVENT_WRITE_COMPLETE:
+ inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
+ if (e->cookie == &write_vio) {
+ if (write_vio.mutex) {
+ SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
+ if (write_vio._cont && this->current_reader)
+ write_vio._cont->handleEvent(event, &write_vio);
+ }
+ } else {
+ update_write_request(write_vio.get_reader(), INT64_MAX, true);
+ }
+ break;
+ case VC_EVENT_READ_COMPLETE:
+ case VC_EVENT_READ_READY:
+ inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
+ if (e->cookie == &read_vio) {
+ if (read_vio.mutex) {
+ SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
+ if (read_vio._cont && this->current_reader)
+ read_vio._cont->handleEvent(event, &read_vio);
+ }
+ } else {
+ this->update_read_request(INT64_MAX, true);
+ }
+ break;
+ case VC_EVENT_EOS: {
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+ // Clean up after yourself if this was an EOS
+ ink_release_assert(this->closed);
+ this->destroy();
+ break;
+ }
+ }
+ return 0;
}
void
-Http2Stream::set_body_to_fetcher(const void *data, size_t len)
+Http2Stream::send_request(Http2ConnectionState &cstate)
{
- ink_assert(_fetch_sm != NULL);
+ // Convert header to HTTP/1.1 format
+ http2_convert_header_from_2_to_1_1(&_req_header);
+
+ // Write header to a buffer. Borrowing logic from HttpSM::write_header_into_buffer.
+ // Seems like a function like this ought to be in HTTPHdr directly
+ int bufindex;
+ int dumpoffset = 0;
+ int done, tmp;
+ IOBufferBlock *block;
+ do {
+ bufindex = 0;
+ tmp = dumpoffset;
+ block = request_buffer.get_current_block();
+ if (!block) {
+ request_buffer.add_block();
+ block = request_buffer.get_current_block();
+ }
+ done = _req_header.print(block->start(), block->write_avail(), &bufindex, &tmp);
+ dumpoffset += bufindex;
+ request_buffer.fill(bufindex);
+ if (!done) {
+ request_buffer.add_block();
+ }
+ } while (!done);
- _fetch_sm->ext_write_data(data, len);
+ // Is there a read_vio request waiting?
+ this->update_read_request(INT64_MAX, true);
}
bool
@@ -140,3 +193,448 @@ Http2Stream::change_state(uint8_t type, uint8_t flags)
return true;
}
+
+VIO *
+Http2Stream::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
+{
+ if (buf) {
+ read_vio.buffer.writer_for(buf);
+ } else {
+ read_vio.buffer.clear();
+ }
+
+ read_vio.mutex = c ? c->mutex : this->mutex;
+ read_vio._cont = c;
+ read_vio.nbytes = nbytes;
+ read_vio.ndone = 0;
+ read_vio.vc_server = this;
+ read_vio.op = VIO::READ;
+
+ // Is there already data in the request_buffer? If so, copy it over and then
+ // schedule a READ_READY or READ_COMPLETE event after we return.
+ update_read_request(nbytes, true);
+
+ return &read_vio;
+}
+
+VIO *
+Http2Stream::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner)
+{
+ if (abuffer) {
+ write_vio.buffer.reader_for(abuffer);
+ } else {
+ write_vio.buffer.clear();
+ }
+ write_vio.mutex = c ? c->mutex : this->mutex;
+ write_vio._cont = c;
+ write_vio.nbytes = nbytes;
+ write_vio.ndone = 0;
+ write_vio.vc_server = this;
+ write_vio.op = VIO::WRITE;
+ response_reader = response_buffer.alloc_reader();
+ return update_write_request(abuffer, nbytes, false) ? &write_vio : NULL;
+}
+
+// Initiated from SM
+void
+Http2Stream::do_io_close(int /* flags */)
+{
+ current_reader = NULL; // SM on the way out
+ if (!sent_delete) {
+ Debug("http2_stream", "do_io_close stream %d", this->get_id());
+
+ // Only close if we are done sending data back to the client
+ if (parent && (!this->is_body_done() || this->response_is_data_available())) {
+ Debug("http2_stream", "%d: Undo close to pass data", this->get_id());
+ closed = false; // "unclose" so this gets picked up later when the netvc side is done
+ this->reenable(&write_vio); // Kick the mechanism to get any remaining data pushed out
+ return;
+ }
+ closed = true;
+ sent_delete = true;
+
+ if (parent) {
+ // Make sure any trailing end of stream frames are sent
+ static_cast<Http2ClientSession *>(parent)->connection_state.send_data_frame(this);
+
+ // Remove ourselves from the stream list
+ static_cast<Http2ClientSession *>(parent)->connection_state.delete_stream(this);
+ }
+ parent = NULL;
+
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+
+ clear_timers();
+
+ if (cross_thread_event != NULL)
+ cross_thread_event->cancel();
+ cross_thread_event = NULL;
+
+ // Send an event to get the stream to kill itself
+ // Thus if any events for the stream are in the queue, they will be handled first.
+ // We have marked the stream closed, so no new events should be queued
+ cross_thread_event = this_ethread()->schedule_imm(this, VC_EVENT_EOS);
+ }
+}
+
+// Initiated from the Http2 side
+void
+Http2Stream::initiating_close()
+{
+ if (!closed) {
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+ Debug("http2_stream", "initiating_close stream %d", this->get_id());
+ closed = true;
+ // leaving the reference to the SM, so we can detatch from the SM
+ // when we actually destroy
+ // current_reader = NULL;
+
+ parent = NULL;
+ clear_timers();
+
+ // This should result in do_io_close or release being called. That will schedule the final
+ // kill yourself signal
+ // Send the SM the EOS signal
+ bool sent_write_complete = false;
+ if (current_reader) {
+ // Push out any last IO events
+ if (write_vio._cont) {
+ SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
+ // Are we done?
+ if (write_vio.nbytes == write_vio.ndone) {
+ Debug("http2_stream", "handle write from destroy stream=%d event=%d", this->_id, VC_EVENT_WRITE_COMPLETE);
+ write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_vio);
+ } else {
+ write_vio._cont->handleEvent(VC_EVENT_EOS, &write_vio);
+ Debug("http2_stream", "handle write from destroy stream=%d event=%d", this->_id, VC_EVENT_EOS);
+ }
+ sent_write_complete = true;
+ }
+ }
+ // Send EOS to let SM know that we aren't sticking around
+ if (current_reader && read_vio._cont) {
+ // Only bother with the EOS if we haven't sent the write complete
+ if (!sent_write_complete) {
+ SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
+ Debug("http2_stream", "send EOS to read cont stream=%d", this->_id);
+ read_vio._cont->handleEvent(VC_EVENT_EOS, &read_vio);
+ }
+ } else if (current_reader) {
+ SCOPED_MUTEX_LOCK(lock, current_reader->mutex, this_ethread());
+ current_reader->handleEvent(VC_EVENT_EOS);
+ } else if (!sent_write_complete) {
+ // Send an event to get the stream to kill itself
+ // Thus if any events for the stream are in the queue, they will be handled first.
+ // We have marked the stream closed, so no new events should be queued
+ if (cross_thread_event != NULL)
+ cross_thread_event->cancel();
+ cross_thread_event = this_ethread()->schedule_imm(this, VC_EVENT_EOS);
+ }
+ }
+}
+
+void
+Http2Stream::update_read_request(int64_t read_len, bool send_update)
+{
+ if (closed || this->current_reader == NULL)
+ return;
+ if (this->get_thread() != this_ethread()) {
+ SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
+ if (cross_thread_event == NULL) {
+ // Send to the right thread
+ cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_READ_READY, NULL);
+ }
+ return;
+ }
+ ink_release_assert(this->get_thread() == this_ethread());
+ if (send_update) {
+ SCOPED_MUTEX_LOCK(lock, read_vio.mutex, this_ethread());
+ if (read_vio.nbytes > 0 && read_vio.ndone <= read_vio.nbytes) {
+ // If this vio has a different buffer, we must copy
+ ink_release_assert(this_ethread() == this->_thread);
+ if (read_vio.buffer.writer() != (&request_buffer)) {
+ int64_t num_to_read = read_vio.nbytes - read_vio.ndone;
+ if (num_to_read > read_len)
+ num_to_read = read_len;
+ if (num_to_read > 0) {
+ int bytes_added = read_vio.buffer.writer()->write(request_reader, num_to_read);
+ if (bytes_added > 0) {
+ request_reader->consume(bytes_added);
+ read_vio.ndone += bytes_added;
+ int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
+ this_ethread()->schedule_imm(this, send_event, &read_vio);
+ // this->handleEvent(send_event, &read_vio);
+ }
+ ink_release_assert(!this->closed);
+ }
+ } else {
+ // Try to be smart and only signal if there was additional data
+ if (request_reader->read_avail() > 0) {
+ int send_event = (read_vio.nbytes == read_vio.ndone) ? VC_EVENT_READ_COMPLETE : VC_EVENT_READ_READY;
+ this_ethread()->schedule_imm(this, send_event, &read_vio);
+ // this->handleEvent(send_event, &read_vio);
+ ink_release_assert(!this->closed);
+ }
+ }
+ }
+ }
+}
+
+bool
+Http2Stream::update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update)
+{
+ bool retval = true;
+ if (closed || parent == NULL)
+ return retval;
+ if (this->get_thread() != this_ethread()) {
+ SCOPED_MUTEX_LOCK(stream_lock, this->mutex, this_ethread());
+ if (cross_thread_event == NULL) {
+ // Send to the right thread
+ cross_thread_event = this->get_thread()->schedule_imm(this, VC_EVENT_WRITE_READY, NULL);
+ }
+ return retval;
+ }
+ ink_release_assert(this->get_thread() == this_ethread());
+ Http2ClientSession *parent = static_cast<Http2ClientSession *>(this->get_parent());
+ // Copy over data in the abuffer into resp_buffer. Then schedule a WRITE_READY or
+ // WRITE_COMPLETE event
+ SCOPED_MUTEX_LOCK(lock, write_vio.mutex, this_ethread());
+ int64_t total_added = 0;
+ if (write_vio.nbytes > 0 && write_vio.ndone < write_vio.nbytes) {
+ int64_t num_to_write = write_vio.nbytes - write_vio.ndone;
+ if (num_to_write > write_len)
+ num_to_write = write_len;
+ int64_t bytes_avail = buf_reader->read_avail();
+ if (bytes_avail > num_to_write)
+ bytes_avail = num_to_write;
+ while (total_added < bytes_avail) {
+ int64_t bytes_added = response_buffer.write(buf_reader, bytes_avail);
+ buf_reader->consume(bytes_added);
+ total_added += bytes_added;
+ }
+ }
+ bool is_done = (this->response_process_data());
+ if (total_added > 0 || is_done) {
+ write_vio.ndone += total_added;
+ int send_event = (write_vio.nbytes == write_vio.ndone || is_done) ? VC_EVENT_WRITE_COMPLETE : VC_EVENT_WRITE_READY;
+
+ // Process the new data
+ if (!this->response_header_done) {
+ // Still parsing the response_header
+ int bytes_used = 0;
+ int state = this->response_header.parse_resp(&http_parser, this->response_reader, &bytes_used, false);
+ // this->response_reader->consume(bytes_used);
+ switch (state) {
+ case PARSE_DONE: {
+ this->response_header_done = true;
+
+ // Send the response header back
+ parent->connection_state.send_headers_frame(this);
+
+ // See if the response is chunked. Set up the dechunking logic if it is
+ is_done = this->response_initialize_data_handling();
+
+ // If there is additional data, send it along in a data frame. Or if this was header only
+ // make sure to send the end of stream
+ if (this->response_is_data_available() || send_event == VC_EVENT_WRITE_COMPLETE) {
+ if (send_event != VC_EVENT_WRITE_COMPLETE) {
+ this_ethread()->schedule_imm(this, VC_EVENT_WRITE_READY, &write_vio);
+ } else {
+ this->mark_body_done();
+ retval = false;
+ }
+ // Send the data frame
+ parent->connection_state.send_data_frame(this);
+ }
+ break;
+ }
+ case PARSE_CONT:
+ // Let it ride for next time
+ break;
+ default:
+ break;
+ }
+ } else {
+ if (send_event == VC_EVENT_WRITE_COMPLETE) {
+ // Defer sending the write complete until the send_data_frame has sent it all
+ // this_ethread()->schedule_imm(this, send_event, &write_vio);
+ this->mark_body_done();
+ parent->connection_state.send_data_frame(this);
+ retval = false;
+ } else {
+ this_ethread()->schedule_imm(this, VC_EVENT_WRITE_READY, &write_vio);
+ parent->connection_state.send_data_frame(this);
+ // write_vio._cont->handleEvent(send_event, &write_vio);
+ }
+ }
+
+ Debug("http2_stream", "write update stream_id=%d event=%d", this->get_id(), send_event);
+ }
+ return retval;
+}
+
+void
+Http2Stream::reenable(VIO *vio)
+{
+ if (this->parent) {
+ if (vio->op == VIO::WRITE) {
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+ update_write_request(vio->get_reader(), INT64_MAX, true);
+ } else if (vio->op == VIO::READ) {
+ SCOPED_MUTEX_LOCK(lock, this->mutex, this_ethread());
+ update_read_request(INT64_MAX, true);
+ }
+ }
+}
+
+void
+Http2Stream::destroy()
+{
+ Debug("http2_stream", "Destroy stream %d. Sent %d bytes", this->_id, this->bytes_sent);
+
+ // Clean up the write VIO in case of inactivity timeout
+ this->do_io_write(NULL, 0, NULL);
+
+ HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
+ ink_hrtime end_time = Thread::get_hrtime();
+ HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, _thread, end_time - _start_time);
+ _req_header.destroy();
+ response_header.destroy();
+
+ // Drop references to all buffer data
+ request_buffer.clear();
+ response_buffer.clear();
+
+ // Free the mutexes in the VIO
+ read_vio.mutex.clear();
+ write_vio.mutex.clear();
+
+ if (header_blocks) {
+ ats_free(header_blocks);
+ }
+ chunked_handler.clear();
+ super::destroy();
+ THREAD_FREE(this, http2StreamAllocator, this_ethread());
+}
+
+bool
+check_stream_thread(Continuation *cont)
+{
+ Http2Stream *stream = dynamic_cast<Http2Stream *>(cont);
+ if (stream) {
+ return stream->get_thread() == this_ethread();
+ } else
+ return true;
+}
+bool
+check_continuation(Continuation *cont)
+{
+ Http2Stream *stream = dynamic_cast<Http2Stream *>(cont);
+ return stream == NULL;
+}
+
+bool
+Http2Stream::response_initialize_data_handling()
+{
+ bool is_done = false;
+ const char *name = "transfer-encoding";
+ const char *value = "chunked";
+ int chunked_index = response_header.value_get_index(name, strlen(name), value, strlen(value));
+ // -1 means this value was not found for this field
+ if (chunked_index >= 0) {
+ Debug("http2_stream", "Response is chunked");
+ chunked = true;
+ this->chunked_handler.init_by_action(this->response_reader, ChunkedHandler::ACTION_DECHUNK);
+ this->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE;
+ this->chunked_handler.dechunked_reader = this->chunked_handler.dechunked_buffer->alloc_reader();
+ this->response_reader->dealloc();
+ this->response_reader = NULL;
+ // Get things going if there is already data waiting
+ if (this->chunked_handler.chunked_reader->is_read_avail_more_than(0)) {
+ is_done = response_process_data();
+ }
+ }
+ return is_done;
+}
+
+bool
+Http2Stream::response_process_data()
+{
+ bool done = false;
+ if (chunked) {
+ do {
+ if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
+ chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
+ }
+ done = this->chunked_handler.process_chunked_content();
+ } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
+ }
+ return done;
+}
+
+bool
+Http2Stream::response_is_data_available() const
+{
+ IOBufferReader *reader = this->response_get_data_reader();
+ return reader ? reader->is_read_avail_more_than(0) : false;
+}
+
+IOBufferReader *
+Http2Stream::response_get_data_reader() const
+{
+ return (chunked) ? chunked_handler.dechunked_reader : response_reader;
+}
+
+void
+Http2Stream::set_active_timeout(ink_hrtime timeout_in)
+{
+ active_timeout = timeout_in;
+ clear_active_timer();
+ if (active_timeout > 0) {
+ active_event = this_ethread()->schedule_in(this, active_timeout);
+ }
+}
+
+void
+Http2Stream::set_inactivity_timeout(ink_hrtime timeout_in)
+{
+ inactive_timeout = timeout_in;
+ if (inactive_timeout > 0) {
+ inactive_timeout_at = Thread::get_hrtime() + inactive_timeout;
+ if (!inactive_event) {
+ inactive_event = this_ethread()->schedule_every(this, HRTIME_SECONDS(1));
+ }
+ } else {
+ clear_inactive_timer();
+ }
+}
+
+void
+Http2Stream::cancel_inactivity_timeout()
+{
+ set_inactivity_timeout(0);
+}
+
+void
+Http2Stream::clear_inactive_timer()
+{
+ inactive_timeout_at = 0;
+ if (inactive_event) {
+ inactive_event->cancel();
+ inactive_event = NULL;
+ }
+}
+void
+Http2Stream::clear_active_timer()
+{
+ if (active_event) {
+ active_event->cancel();
+ active_event = NULL;
+ }
+}
+void
+Http2Stream::clear_timers()
+{
+ clear_inactive_timer();
+ clear_active_timer();
+}
diff --git a/proxy/http2/Http2Stream.h b/proxy/http2/Http2Stream.h
index 311733c..b7a5514 100644
--- a/proxy/http2/Http2Stream.h
+++ b/proxy/http2/Http2Stream.h
@@ -1,6 +1,6 @@
/** @file
- Http2Stream
+ Http2Stream.h
@section license License
@@ -25,51 +25,47 @@
#define __HTTP2_STREAM_H__
#include "HTTP2.h"
-#include "FetchSM.h"
+#include "../ProxyClientTransaction.h"
#include "Http2DebugNames.h"
+#include "../http/HttpTunnel.h" // To get ChunkedHandler
class Http2ConnectionState;
-class Http2Stream
+class Http2Stream : public ProxyClientTransaction
{
public:
+ typedef ProxyClientTransaction super; ///< Parent type.
Http2Stream(Http2StreamId sid = 0, ssize_t initial_rwnd = Http2::initial_window_size)
: client_rwnd(initial_rwnd), server_rwnd(Http2::initial_window_size), header_blocks(NULL), header_blocks_length(0),
- request_header_length(0), end_stream(false), _id(sid), _state(HTTP2_STREAM_STATE_IDLE), _fetch_sm(NULL),
- trailing_header(false), body_done(false), data_length(0)
+ request_header_length(0), end_stream(false), response_reader(NULL), request_reader(NULL),
+ request_buffer(CLIENT_CONNECTION_FIRST_READ_BUFFER_SIZE_INDEX), _id(sid), _state(HTTP2_STREAM_STATE_IDLE),
+ trailing_header(false), body_done(false), data_length(0), closed(false), sent_delete(false), bytes_sent(0), chunked(false),
+ cross_thread_event(NULL), active_event(NULL), inactive_event(NULL)
{
+ SET_HANDLER(&Http2Stream::main_event_handler);
+ }
+
+ void
+ init(Http2StreamId sid, ssize_t initial_rwnd)
+ {
+ _id = sid;
+ _start_time = Thread::get_hrtime();
_thread = this_ethread();
+ this->client_rwnd = initial_rwnd;
HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
- _start_time = Thread::get_hrtime();
- // FIXME: Are you sure? every "stream" needs _req_header?
+ sm_reader = request_reader = request_buffer.alloc_reader();
+ http_parser_init(&http_parser);
+ // FIXME: Are you sure? every "stream" needs request_header?
_req_header.create(HTTP_TYPE_REQUEST);
+ response_header.create(HTTP_TYPE_RESPONSE);
}
- ~Http2Stream()
- {
- Debug("http2_stream", "[%d] state: %s", _id, Http2DebugNames::get_state_name(_state));
- HTTP2_DECREMENT_THREAD_DYN_STAT(HTTP2_STAT_CURRENT_CLIENT_STREAM_COUNT, _thread);
- ink_hrtime end_time = Thread::get_hrtime();
- HTTP2_SUM_THREAD_DYN_STAT(HTTP2_STAT_TOTAL_TRANSACTIONS_TIME, _thread, end_time - _start_time);
- _req_header.destroy();
+ ~Http2Stream() { this->destroy(); }
- if (_fetch_sm) {
- _fetch_sm->ext_destroy();
- _fetch_sm = NULL;
- }
- if (header_blocks) {
- ats_free(header_blocks);
- }
- }
+ int main_event_handler(int event, void *edata);
+
+ void destroy();
- // Operate FetchSM
- bool init_fetcher(Http2ConnectionState &cstate);
- void set_body_to_fetcher(const void *data, size_t len);
- FetchSM *
- get_fetcher()
- {
- return _fetch_sm;
- }
bool
is_body_done() const
{
@@ -81,6 +77,12 @@ public:
body_done = true;
}
+ void
+ update_sent_count(int num_bytes)
+ {
+ bytes_sent += num_bytes;
+ }
+
const Http2StreamId
get_id() const
{
@@ -92,6 +94,18 @@ public:
return _state;
}
bool change_state(uint8_t type, uint8_t flags);
+
+ void
+ set_id(Http2StreamId sid)
+ {
+ _id = sid;
+ }
+ void
+ update_initial_rwnd(Http2WindowSize new_size)
+ {
+ client_rwnd = new_size;
+ }
+
bool
has_trailing_header() const
{
@@ -118,6 +132,16 @@ public:
return content_length == 0 || content_length == data_length;
}
+ void send_request(Http2ConnectionState &cstate);
+ VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf);
+ VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuffer, bool owner = false);
+ void do_io_close(int lerrno = -1);
+ void initiating_close();
+ void do_io_shutdown(ShutdownHowTo_t) {}
+ void update_read_request(int64_t read_len, bool send_update);
+ bool update_write_request(IOBufferReader *buf_reader, int64_t write_len, bool send_update);
+ void reenable(VIO *vio);
+
// Stream level window size
ssize_t client_rwnd, server_rwnd;
@@ -130,17 +154,90 @@ public:
// and other fields)
bool end_stream;
+ bool sent_request_header;
+ bool response_header_done;
+ bool request_sent;
+
+ HTTPHdr response_header;
+ IOBufferReader *response_reader;
+ IOBufferReader *request_reader;
+ MIOBuffer request_buffer;
+
+ EThread *
+ get_thread()
+ {
+ return _thread;
+ }
+
+ IOBufferReader *response_get_data_reader() const;
+ bool
+ response_is_chunked() const
+ {
+ return chunked;
+ }
+ bool response_initialize_data_handling();
+ bool response_process_data();
+ bool response_is_data_available() const;
+ // For Http2 releasing the transaction should go ahead and delete it
+ void
+ release(IOBufferReader *r)
+ {
+ current_reader = NULL; // State machine is on its own way down.
+ this->do_io_close();
+ }
+
+ virtual bool
+ allow_half_open() const
+ {
+ return false;
+ }
+
+ virtual const char *
+ get_protocol_string() const
+ {
+ return "http2";
+ }
+
+ virtual void set_active_timeout(ink_hrtime timeout_in);
+ virtual void set_inactivity_timeout(ink_hrtime timeout_in);
+ virtual void cancel_inactivity_timeout();
+ void clear_inactive_timer();
+ void clear_active_timer();
+ void clear_timers();
+
private:
+ HTTPParser http_parser;
ink_hrtime _start_time;
EThread *_thread;
Http2StreamId _id;
Http2StreamState _state;
+ MIOBuffer response_buffer;
HTTPHdr _req_header;
- FetchSM *_fetch_sm;
+ VIO read_vio;
+ VIO write_vio;
bool trailing_header;
bool body_done;
uint64_t data_length;
+ bool closed;
+ bool sent_delete;
+ int bytes_sent;
+ ChunkedHandler chunked_handler;
+ bool chunked;
+ Event *cross_thread_event;
+
+ // Support stream-specific timeouts
+ ink_hrtime active_timeout;
+ Event *active_event;
+
+ ink_hrtime inactive_timeout;
+ ink_hrtime inactive_timeout_at;
+ Event *inactive_event;
};
-#endif // __HTTP2_STREAM_H__
+extern ClassAllocator<Http2Stream> http2StreamAllocator;
+
+extern bool check_continuation(Continuation *cont);
+extern bool check_stream_thread(Continuation *cont);
+
+#endif
diff --git a/proxy/spdy/SpdyCallbacks.cc b/proxy/spdy/SpdyCallbacks.cc
index 6b3c02b..347e959 100644
--- a/proxy/spdy/SpdyCallbacks.cc
+++ b/proxy/spdy/SpdyCallbacks.cc
@@ -315,6 +315,7 @@ spdy_on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, sp
sm->req_map[stream_id] = req;
sm->vc->add_to_active_queue();
spdy_process_syn_stream_frame(sm, req);
+ sm->transact_count++;
break;
case SPDYLAY_HEADERS:
diff --git a/proxy/spdy/SpdyClientSession.h b/proxy/spdy/SpdyClientSession.h
index 5e5d183..ecf68cd 100644
--- a/proxy/spdy/SpdyClientSession.h
+++ b/proxy/spdy/SpdyClientSession.h
@@ -146,6 +146,16 @@ public:
}
void new_connection(NetVConnection *new_vc, MIOBuffer *iobuf, IOBufferReader *reader, bool backdoor);
+ int
+ get_transact_count() const
+ {
+ return this->transact_count;
+ }
+ void
+ release(ProxyClientTransaction *)
+ { /* TBD */
+ }
+
int64_t sm_id;
spdy::SessionVersion version;
uint64_t total_size;
@@ -164,6 +174,7 @@ public:
int event;
spdylay_session *session;
+ int transact_count;
map<int32_t, SpdyRequest *> req_map;
--
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].