You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ma...@apache.org on 2022/08/17 01:37:58 UTC
[trafficserver] branch 10-Dev updated: Add support for sendmmsg and UDP GSO (#9037)
This is an automated email from the ASF dual-hosted git repository.
maskit pushed a commit to branch 10-Dev
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/10-Dev by this push:
new 76c1dad74 Add support for sendmmsg and UDP GSO (#9037)
76c1dad74 is described below
commit 76c1dad747ae8c79e391b20f93bcb563edab88c3
Author: Masakazu Kitajo <ma...@apache.org>
AuthorDate: Wed Aug 17 10:37:52 2022 +0900
Add support for sendmmsg and UDP GSO (#9037)
* Use sendmmsg and UDP GSO if available
If sendmmsg is unavailable, the code just calls sendmsg multiple times.
There is a case that UDP GSO is suppoted by Kernel but it's administratively
disabled or a network driver doesn't support it. The code detects such situation
on the first call for send(m)msg, and makes multiple msghdr on application side.
* Add a setting for UDP GSO
* Add documentation for UDP settings
* Fix compile error on FreeBSD
* Fix more compile errors on FreeBSD
* Explain things a bit more in the warning message and the documentation
---
configure.ac | 1 +
doc/admin-guide/files/records.config.en.rst | 13 ++
iocore/net/P_QUICPacketHandler_quiche.h | 2 +-
iocore/net/P_UDPNet.h | 14 +-
iocore/net/P_UDPPacket.h | 8 +-
iocore/net/QUICNetVConnection_quiche.cc | 40 +++-
iocore/net/QUICPacketHandler_quiche.cc | 4 +-
iocore/net/UnixUDPNet.cc | 351 ++++++++++++++++++++++++----
mgmt/RecordsConfig.cc | 2 +
9 files changed, 372 insertions(+), 63 deletions(-)
diff --git a/configure.ac b/configure.ac
index 234cd2ae1..7b25d9a90 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1401,6 +1401,7 @@ AC_CHECK_FUNCS([clock_gettime kqueue epoll_ctl posix_fadvise posix_madvise posix
AC_CHECK_FUNCS([port_create strlcpy strlcat sysconf sysctlbyname getpagesize])
AC_CHECK_FUNCS([getreuid getresuid getresgid setreuid setresuid getpeereid getpeerucred])
AC_CHECK_FUNCS([strsignal psignal psiginfo accept4])
+AC_CHECK_FUNCS([sendmmsg])
# Check for eventfd() and sys/eventfd.h (both must exist ...)
AC_CHECK_HEADERS([sys/eventfd.h], [
diff --git a/doc/admin-guide/files/records.config.en.rst b/doc/admin-guide/files/records.config.en.rst
index 23500c422..affdec175 100644
--- a/doc/admin-guide/files/records.config.en.rst
+++ b/doc/admin-guide/files/records.config.en.rst
@@ -4536,6 +4536,19 @@ removed in the future without prior notice.
This is just for debugging. Do not change it from the default value unless
you really understand what this is.
+UDP Configuration
+=====================
+
+.. ts:cv:: CONFIG proxy.config.udp.threads INT 0
+
+ Specifies the number of UDP threads to run. By default 0 threads are dedicated to UDP,
+ which results in effectively disabling UDP support.
+
+.. ts:cv:: CONFIG proxy.config.udp.enable_gso INT 0
+
+ Enables (``1``) or disables (``0``) UDP GSO. When enabled, |TS| tries to use UDP GSO,
+ and disables it automatically if it causes send errors.
+
Plug-in Configuration
=====================
diff --git a/iocore/net/P_QUICPacketHandler_quiche.h b/iocore/net/P_QUICPacketHandler_quiche.h
index 7bb016738..d6b9194b0 100644
--- a/iocore/net/P_QUICPacketHandler_quiche.h
+++ b/iocore/net/P_QUICPacketHandler_quiche.h
@@ -38,7 +38,7 @@ public:
QUICPacketHandler();
virtual ~QUICPacketHandler();
- void send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload);
+ void send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size = 0);
void close_connection(QUICNetVConnection *conn);
protected:
diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h
index a7de9647b..34fcdd070 100644
--- a/iocore/net/P_UDPNet.h
+++ b/iocore/net/P_UDPNet.h
@@ -40,6 +40,8 @@ static inline PollCont *get_UDPPollCont(EThread *);
#include "P_UnixUDPConnection.h"
#include "P_UDPIOEvent.h"
+#include "netinet/udp.h"
+
class UDPNetHandler;
struct UDPNetProcessorInternal : public UDPNetProcessor {
@@ -278,6 +280,9 @@ class UDPQueue
ink_hrtime last_service = 0;
int packets = 0;
int added = 0;
+#ifdef SOL_UDP
+ bool use_udp_gso = false;
+#endif
public:
// Outgoing UDP Packet Queue
@@ -286,12 +291,13 @@ public:
void service(UDPNetHandler *);
void SendPackets();
- void SendUDPPacket(UDPPacketInternal *p, int32_t pktLen);
+ void SendUDPPacket(UDPPacketInternal *p);
+ int SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n);
// Interface exported to the outside world
void send(UDPPacket *p);
- UDPQueue();
+ UDPQueue(bool enable_gso);
~UDPQueue();
};
@@ -301,7 +307,7 @@ class UDPNetHandler : public Continuation, public EThread::LoopTailHandler
{
public:
// engine for outgoing packets
- UDPQueue udpOutQueue{};
+ UDPQueue udpOutQueue;
// New UDPConnections
// to hold the newly created descriptors before scheduling them on the servicing buckets.
@@ -323,7 +329,7 @@ public:
int waitForActivity(ink_hrtime timeout) override;
void signalActivity() override;
- UDPNetHandler();
+ UDPNetHandler(bool enable_gso);
};
struct PollCont;
diff --git a/iocore/net/P_UDPPacket.h b/iocore/net/P_UDPPacket.h
index 140037324..22b432f0e 100644
--- a/iocore/net/P_UDPPacket.h
+++ b/iocore/net/P_UDPPacket.h
@@ -42,7 +42,8 @@ public:
SLINK(UDPPacketInternal, alink); // atomic link
// packet scheduling stuff: keep it a doubly linked list
- uint64_t pktLength = 0;
+ uint64_t pktLength = 0;
+ uint16_t segment_size = 0;
int reqGenerationNum = 0;
ink_hrtime delivery_time = 0; // when to deliver packet
@@ -162,7 +163,7 @@ UDPPacket::getConnection()
}
TS_INLINE UDPPacket *
-new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf)
+new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &buf, uint16_t segment_size = 0)
{
UDPPacketInternal *p = udpPacketAllocator.alloc();
@@ -171,7 +172,8 @@ new_UDPPacket(struct sockaddr const *to, ink_hrtime when, Ptr<IOBufferBlock> &bu
p->delivery_time = when;
if (to)
ats_ip_copy(&p->to, to);
- p->chain = buf;
+ p->chain = buf;
+ p->segment_size = segment_size;
return p;
}
diff --git a/iocore/net/QUICNetVConnection_quiche.cc b/iocore/net/QUICNetVConnection_quiche.cc
index c92dbbc4c..6f9edb8ac 100644
--- a/iocore/net/QUICNetVConnection_quiche.cc
+++ b/iocore/net/QUICNetVConnection_quiche.cc
@@ -548,19 +548,35 @@ QUICNetVConnection::_handle_write_ready()
Ptr<IOBufferBlock> udp_payload;
quiche_send_info send_info;
- ssize_t written;
-
- do {
- udp_payload = (new_IOBufferBlock());
- udp_payload->alloc(iobuffer_size_to_index(quiche_conn_max_send_udp_payload_size(this->_quiche_con), BUFFER_SIZE_INDEX_16K));
- written =
- quiche_conn_send(this->_quiche_con, reinterpret_cast<uint8_t *>(udp_payload->end()), udp_payload->write_avail(), &send_info);
- if (written > 0) {
- udp_payload->fill(written);
- this->_packet_handler->send_packet(this->_udp_con, this->con.addr, udp_payload);
- net_activity(this, this_ethread());
+ ssize_t res;
+ ssize_t written = 0;
+
+ size_t quantum = quiche_conn_send_quantum(this->_quiche_con);
+ size_t max_udp_payload_size = quiche_conn_max_send_udp_payload_size(this->_quiche_con);
+
+ // This buffer size must be less than 64KB because it can be used for UDP GSO (UDP_SEGMENT)
+ udp_payload = new_IOBufferBlock();
+ udp_payload->alloc(buffer_size_to_index(quantum, BUFFER_SIZE_INDEX_32K));
+ quantum = std::min(static_cast<int64_t>(quantum), udp_payload->write_avail());
+ while (written + max_udp_payload_size <= quantum) {
+ res = quiche_conn_send(this->_quiche_con, reinterpret_cast<uint8_t *>(udp_payload->end()) + written, max_udp_payload_size,
+ &send_info);
+ if (res > 0) {
+ written += res;
}
- } while (written > 0);
+ if (static_cast<size_t>(res) != max_udp_payload_size) {
+ break;
+ }
+ }
+ if (written > 0) {
+ udp_payload->fill(written);
+ int segment_size = 0;
+ if (static_cast<size_t>(written) > max_udp_payload_size) {
+ segment_size = max_udp_payload_size;
+ }
+ this->_packet_handler->send_packet(this->_udp_con, this->con.addr, udp_payload, segment_size);
+ net_activity(this, this_ethread());
+ }
}
void
diff --git a/iocore/net/QUICPacketHandler_quiche.cc b/iocore/net/QUICPacketHandler_quiche.cc
index 27cb45a9e..eb5ca2252 100644
--- a/iocore/net/QUICPacketHandler_quiche.cc
+++ b/iocore/net/QUICPacketHandler_quiche.cc
@@ -69,9 +69,9 @@ QUICPacketHandler::close_connection(QUICNetVConnection *conn)
}
void
-QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload)
+QUICPacketHandler::send_packet(UDPConnection *udp_con, IpEndpoint &addr, Ptr<IOBufferBlock> udp_payload, uint16_t segment_size)
{
- UDPPacket *udp_packet = new_UDPPacket(addr, 0, udp_payload);
+ UDPPacket *udp_packet = new_UDPPacket(addr, 0, udp_payload, segment_size);
if (is_debug_tag_set(v_debug_tag)) {
ip_port_text_buffer ipb;
diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc
index dd30f279e..b211bc59c 100644
--- a/iocore/net/UnixUDPNet.cc
+++ b/iocore/net/UnixUDPNet.cc
@@ -37,6 +37,12 @@
#include "P_Net.h"
#include "P_UDPNet.h"
+#include "netinet/udp.h"
+#ifndef UDP_SEGMENT
+// This is needed because old glibc may not have the constant even if Kernel supports it.
+#define UDP_SEGMENT 103
+#endif
+
using UDPNetContHandler = int (UDPNetHandler::*)(int, void *);
ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
@@ -63,9 +69,12 @@ sockaddr_in6 G_bwGrapherLoc;
void
initialize_thread_for_udp_net(EThread *thread)
{
+ int enable_gso;
+ REC_ReadConfigInteger(enable_gso, "proxy.config.udp.enable_gso");
+
UDPNetHandler *nh = get_UDPNetHandler(thread);
- new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler;
+ new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(enable_gso);
new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex);
// The UDPNetHandler cannot be accessed across EThreads.
// Because the UDPNetHandler should be called back immediately after UDPPollCont.
@@ -860,7 +869,16 @@ Lerror:
}
// send out all packets that need to be sent out as of time=now
-UDPQueue::UDPQueue() {}
+#ifdef SOL_UDP
+UDPQueue::UDPQueue(bool enable_gso) : use_udp_gso(enable_gso) {}
+#else
+UDPQueue::UDPQueue(bool enable_gso)
+{
+ if (enable_gso) {
+ Warning("Attempted to use UDP GSO per configuration, but it is unavailable");
+ }
+}
+#endif
UDPQueue::~UDPQueue() {}
@@ -921,13 +939,23 @@ UDPQueue::SendPackets()
ink_hrtime now = Thread::get_hrtime_updated();
ink_hrtime send_threshold_time = now + SLOT_TIME;
int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
- int32_t bytesThisPipe, sentOne;
+ int32_t bytesThisPipe;
int64_t pktLen;
bytesThisSlot = INT_MAX;
+#ifdef UIO_MAXIOV
+ constexpr int N_MAX_PACKETS = UIO_MAXIOV; // The limit comes from sendmmsg
+#else
+ constexpr int N_MAX_PACKETS = 1024;
+#endif
+ UDPPacketInternal *packets[N_MAX_PACKETS];
+ int nsent;
+ int npackets;
+
sendPackets:
- sentOne = false;
+ nsent = 0;
+ npackets = 0;
bytesThisPipe = bytesThisSlot;
while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
@@ -941,21 +969,25 @@ sendPackets:
goto next_pkt;
}
- SendUDPPacket(p, pktLen);
bytesUsed += pktLen;
bytesThisPipe -= pktLen;
+ packets[npackets++] = p;
next_pkt:
- sentOne = true;
- p->free();
-
- if (bytesThisPipe < 0) {
+ if (bytesThisPipe < 0 && npackets == N_MAX_PACKETS) {
break;
}
}
+ if (npackets > 0) {
+ nsent = SendMultipleUDPPackets(packets, npackets);
+ }
+ for (int i = 0; i < nsent; ++i) {
+ packets[i]->free();
+ }
+
bytesThisSlot -= bytesUsed;
- if ((bytesThisSlot > 0) && sentOne) {
+ if ((bytesThisSlot > 0) && nsent) {
// redistribute the slack...
now = Thread::get_hrtime_updated();
if (pipeInfo.firstPacket(now) == nullptr) {
@@ -971,51 +1003,133 @@ sendPackets:
}
void
-UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t /* pktLen ATS_UNUSED */)
+UDPQueue::SendUDPPacket(UDPPacketInternal *p)
{
struct msghdr msg;
struct iovec iov[32];
- int n, count, iov_len = 0;
+ int n, count = 0;
p->conn->lastSentPktStartTime = p->delivery_time;
Debug("udp-send", "Sending %p", p);
-#if !defined(solaris)
msg.msg_control = nullptr;
msg.msg_controllen = 0;
msg.msg_flags = 0;
-#endif
- msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa);
- msg.msg_namelen = ats_ip_size(p->to);
- iov_len = 0;
-
- for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) {
- iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
- iov[iov_len].iov_len = b->size();
- iov_len++;
- }
- msg.msg_iov = iov;
- msg.msg_iovlen = iov_len;
-
- count = 0;
- while (true) {
- // stupid Linux problem: sendmsg can return EAGAIN
- n = ::sendmsg(p->conn->getFd(), &msg, 0);
- if ((n >= 0) || (errno != EAGAIN)) {
- // send succeeded or some random error happened.
- if (n < 0) {
- Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
+ msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa);
+ msg.msg_namelen = ats_ip_size(p->to);
+
+ if (p->segment_size > 0) {
+ ink_assert(p->chain->next == nullptr);
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+#ifdef SOL_UDP
+ if (use_udp_gso) {
+ iov[0].iov_base = p->chain.get()->start();
+ iov[0].iov_len = p->chain.get()->size();
+
+ union udp_segment_hdr {
+ char buf[CMSG_SPACE(sizeof(uint16_t))];
+ struct cmsghdr align;
+ } u;
+ msg.msg_control = u.buf;
+ msg.msg_controllen = sizeof(u.buf);
+
+ struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
+ cm->cmsg_level = SOL_UDP;
+ cm->cmsg_type = UDP_SEGMENT;
+ cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
+ *((uint16_t *)CMSG_DATA(cm)) = p->segment_size;
+
+ count = 0;
+ while (true) {
+ // stupid Linux problem: sendmsg can return EAGAIN
+ n = ::sendmsg(p->conn->getFd(), &msg, 0);
+ if (n >= 0) {
+ break;
+ }
+ if (errno == EIO && use_udp_gso) {
+ Warning("Disabling UDP GSO due to an error");
+ use_udp_gso = false;
+ SendUDPPacket(p);
+ return;
+ }
+ if (errno == EAGAIN) {
+ ++count;
+ if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
+ // tried too many times; give up
+ Debug("udpnet", "Send failed: too many retries");
+ return;
+ }
+ } else {
+ Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
+ return;
+ }
}
+ } else {
+#endif
+ // Send segments seprately if UDP_SEGMENT is not supported
+ int offset = 0;
+ while (offset < p->chain.get()->size()) {
+ iov[0].iov_base = p->chain.get()->start() + offset;
+ iov[0].iov_len = std::min(static_cast<long>(p->segment_size), p->chain.get()->end() - static_cast<char *>(iov[0].iov_base));
+
+ count = 0;
+ while (true) {
+ // stupid Linux problem: sendmsg can return EAGAIN
+ n = ::sendmsg(p->conn->getFd(), &msg, 0);
+ if (n >= 0) {
+ break;
+ }
+ if (errno == EAGAIN) {
+ ++count;
+ if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
+ // tried too many times; give up
+ Debug("udpnet", "Send failed: too many retries");
+ return;
+ }
+ } else {
+ Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
+ return;
+ }
+ }
- break;
+ offset += iov[0].iov_len;
+ }
+ ink_assert(offset == p->chain.get()->size());
+#ifdef SOL_UDP
+ } // use_udp_segment
+#endif
+ } else {
+ // Nothing is special
+ int iov_len = 0;
+ for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) {
+ iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
+ iov[iov_len].iov_len = b->size();
+ iov_len++;
}
- if (errno == EAGAIN) {
- ++count;
- if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
- // tried too many times; give up
- Debug("udpnet", "Send failed: too many retries");
+ msg.msg_iov = iov;
+ msg.msg_iovlen = iov_len;
+
+ count = 0;
+ while (true) {
+ // stupid Linux problem: sendmsg can return EAGAIN
+ n = ::sendmsg(p->conn->getFd(), &msg, 0);
+ if ((n >= 0) || (errno != EAGAIN)) {
+ // send succeeded or some random error happened.
+ if (n < 0) {
+ Debug("udp-send", "Error: %s (%d)", strerror(errno), errno);
+ }
+
break;
}
+ if (errno == EAGAIN) {
+ ++count;
+ if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
+ // tried too many times; give up
+ Debug("udpnet", "Send failed: too many retries");
+ break;
+ }
+ }
}
}
}
@@ -1027,6 +1141,161 @@ UDPQueue::send(UDPPacket *p)
outQueue.push((UDPPacketInternal *)p);
}
+int
+UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n)
+{
+#ifdef HAVE_SENDMMSG
+ struct mmsghdr *msgvec;
+ int msgvec_size;
+
+#ifdef SOL_UDP
+ union udp_segment_hdr {
+ char buf[CMSG_SPACE(sizeof(uint16_t))];
+ struct cmsghdr align;
+ };
+ if (use_udp_gso) {
+ msgvec_size = sizeof(struct mmsghdr) * n;
+ } else {
+ msgvec_size = sizeof(struct mmsghdr) * n * 64;
+ }
+#else
+ msgvec_size = sizeof(struct mmsghdr) * n * 64;
+#endif
+ msgvec = static_cast<struct mmsghdr *>(alloca(msgvec_size));
+ memset(msgvec, 0, msgvec_size);
+
+ int vlen = 0;
+ int fd = p[0]->conn->getFd();
+ for (int i = 0; i < n; ++i) {
+ UDPPacketInternal *packet;
+ struct msghdr *msg;
+ struct iovec *iov;
+ int iov_len;
+
+ packet = p[i];
+ packet->conn->lastSentPktStartTime = packet->delivery_time;
+ ink_assert(packet->conn->getFd() == fd);
+ if (packet->segment_size > 0) {
+ // Presumes one big super buffer is given
+ ink_assert(packet->chain->next == nullptr);
+#ifdef SOL_UDP
+ if (use_udp_gso) {
+ msg = &msgvec[vlen].msg_hdr;
+ msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa);
+ msg->msg_namelen = ats_ip_size(packet->to);
+
+ union udp_segment_hdr *u;
+ u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union udp_segment_hdr)));
+ msg->msg_control = u->buf;
+ msg->msg_controllen = sizeof(u->buf);
+ iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec)));
+ iov_len = 1;
+ iov->iov_base = packet->chain.get()->start();
+ iov->iov_len = packet->chain.get()->size();
+ msg->msg_iov = iov;
+ msg->msg_iovlen = iov_len;
+
+ struct cmsghdr *cm = CMSG_FIRSTHDR(msg);
+ cm->cmsg_level = SOL_UDP;
+ cm->cmsg_type = UDP_SEGMENT;
+ cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
+ *((uint16_t *)CMSG_DATA(cm)) = packet->segment_size;
+ vlen++;
+ } else {
+#endif
+ // UDP_SEGMENT is unavailable
+ // Send the given data as multiple messages
+ int offset = 0;
+ while (offset < packet->chain.get()->size()) {
+ msg = &msgvec[vlen].msg_hdr;
+ msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa);
+ msg->msg_namelen = ats_ip_size(packet->to);
+ iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec)));
+ iov_len = 1;
+ iov->iov_base = packet->chain.get()->start() + offset;
+ iov->iov_len =
+ std::min(packet->segment_size, static_cast<uint16_t>(packet->chain.get()->end() - static_cast<char *>(iov->iov_base)));
+ msg->msg_iov = iov;
+ msg->msg_iovlen = iov_len;
+ offset += iov->iov_len;
+ vlen++;
+ }
+ ink_assert(offset == packet->chain.get()->size());
+#ifdef SOL_UDP
+ } // use_udp_gso
+#endif
+ } else {
+ // Nothing is special
+ msg = &msgvec[vlen].msg_hdr;
+ msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa);
+ msg->msg_namelen = ats_ip_size(packet->to);
+ iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec) * 64));
+ iov_len = 0;
+ for (IOBufferBlock *b = packet->chain.get(); b != nullptr; b = b->next.get()) {
+ iov[iov_len].iov_base = static_cast<caddr_t>(b->start());
+ iov[iov_len].iov_len = b->size();
+ iov_len++;
+ }
+ msg->msg_iov = iov;
+ msg->msg_iovlen = iov_len;
+ vlen++;
+ }
+ }
+
+ if (vlen == 0) {
+ return 0;
+ }
+
+ int res = ::sendmmsg(fd, msgvec, vlen, 0);
+ if (res < 0) {
+#ifdef SOL_UDP
+ if (use_udp_gso && errno == EIO) {
+ Warning("Disabling UDP GSO due to an error");
+ Debug("udp-send", "Disabling UDP GSO due to an error");
+ use_udp_gso = false;
+ return SendMultipleUDPPackets(p, n);
+ } else {
+ Debug("udp-send", "udp_gso=%d res=%d errno=%d", use_udp_gso, res, errno);
+ return res;
+ }
+#else
+ Debug("udp-send", "res=%d errno=%d", res, errno);
+ return res;
+#endif
+ }
+
+ if (res > 0) {
+#ifdef SOL_UDP
+ if (use_udp_gso) {
+ Debug("udp-send", "Sent %d messages by processing %d UDPPackets (GSO)", res, n);
+ } else {
+#endif
+ int i = 0;
+ int nmsg = res;
+ for (i = 0; i < n && res > 0; ++i) {
+ if (p[i]->segment_size == 0) {
+ res -= 1;
+ } else {
+ res -= (p[i]->chain.get()->size() / p[i]->segment_size) + ((p[i]->chain.get()->size() % p[i]->segment_size) != 0);
+ }
+ }
+ Debug("udp-send", "Sent %d messages by processing %d UDPPackets", nmsg, i);
+ res = i;
+#ifdef SOL_UDP
+ }
+#endif
+ }
+
+ return res;
+#else
+ // sendmmsg is unavailable
+ for (int i = 0; i < n; ++i) {
+ SendUDPPacket(p[i]);
+ }
+ return n;
+#endif
+}
+
#undef LINK
static void
@@ -1043,7 +1312,7 @@ net_signal_hook_callback(EThread *thread)
#endif
}
-UDPNetHandler::UDPNetHandler()
+UDPNetHandler::UDPNetHandler(bool enable_gso) : udpOutQueue(enable_gso)
{
nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000);
lastCheck = 0;
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index bb80a5745..aa787e857 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -258,6 +258,8 @@ static const RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.udp.threads", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
,
+ {RECT_CONFIG, "proxy.config.udp.enable_gso", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
+ ,
//##############################################################################
//#