You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ch...@apache.org on 2013/09/17 05:51:47 UTC
git commit: TS-2201: split drainIncomingChannel two thread,
one handle Broadcast message and other handle Reliable(TCP) request.
Updated Branches:
refs/heads/master f47c6be30 -> 3a903f2bc
TS-2201: split drainIncomingChannel two thread, one handle Broadcast message and other handle Reliable(TCP) request.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b
Branch: refs/heads/master
Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b
Parents: f47c6be
Author: Chen Bin <ku...@taobao.com>
Authored: Tue Sep 17 11:51:11 2013 +0800
Committer: Chen Bin <ku...@taobao.com>
Committed: Tue Sep 17 11:51:11 2013 +0800
----------------------------------------------------------------------
CHANGES | 3 ++
mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++-------------
2 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e9c056d..f2e9fd4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
-*- coding: utf-8 -*-
Changes with Apache Traffic Server 4.1.0
+ *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast message and other handle Reliable(TCP)
+ request for supporing large cluster.
+
*) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash
the process intermittently.
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc
----------------------------------------------------------------------
diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc
index 430e01b..fa173cc 100644
--- a/mgmt/cluster/ClusterCom.cc
+++ b/mgmt/cluster/ClusterCom.cc
@@ -48,6 +48,76 @@
int MultiCastMessages = 0;
long LastHighestDelta = -1L;
+
+void *
+drainIncomingChannel_broadcast(void *arg)
+{
+ char message[61440];
+ fd_set fdlist;
+ void *ret = arg;
+
+ time_t t;
+ time_t last_multicast_receive_time = time(NULL);
+ struct timeval tv;
+
+ /* Avert race condition, thread spun during constructor */
+ while (!lmgmt->ccom || !lmgmt->ccom->init) {
+ mgmt_sleep_sec(1);
+ }
+
+ lmgmt->syslogThrInit();
+
+ for (;;) { /* Loop draining mgmt network channels */
+ // linux: set tv.tv_set in select() loop, since linux's select()
+ // will update tv with the amount of time not slept (most other
+ // implementations do not do this)
+ tv.tv_sec = lmgmt->ccom->mc_poll_timeout; // interface not-responding timeout
+ tv.tv_usec = 0;
+
+ memset(message, 0, 61440);
+ FD_ZERO(&fdlist);
+
+ if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
+ if (lmgmt->ccom->receive_fd > 0) {
+ FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */
+ }
+ }
+
+ mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
+
+ if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
+ // Multicast timeout considerations
+ if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) {
+ t = time(NULL);
+ if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
+ // Timeout on multicast receive channel, reset channel.
+ if (lmgmt->ccom->receive_fd > 0) {
+ close(lmgmt->ccom->receive_fd);
+ }
+ lmgmt->ccom->receive_fd = -1;
+ Debug("ccom", "Timeout, resetting multicast receive channel");
+ if (lmgmt->ccom->establishReceiveChannel(0)) {
+ Debug("ccom", "establishReceiveChannel failed");
+ lmgmt->ccom->receive_fd = -1;
+ }
+ last_multicast_receive_time = t; // next action at next interval
+ }
+ } else {
+ last_multicast_receive_time = time(NULL); // valid multicast msg
+ }
+ }
+
+ /* Broadcast message */
+ if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
+ lmgmt->ccom->receive_fd > 0 &&
+ FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
+ (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
+ lmgmt->ccom->handleMultiCastMessage(message);
+ }
+ }
+ return ret;
+} /* End drainIncomingChannel */
+
/*
* drainIncomingChannel
* This function is blocking, it never returns. It is meant to allow for
@@ -89,8 +159,6 @@ drainIncomingChannel(void *arg)
// to reopen the channel (e.g. opening the socket would fail if the
// interface was down). In this case, the ccom->receive_fd is set
// to '-1' and the open is retried until it succeeds.
- time_t t;
- time_t last_multicast_receive_time = time(NULL);
struct timeval tv;
/* Avert race condition, thread spun during constructor */
@@ -111,43 +179,12 @@ drainIncomingChannel(void *arg)
FD_ZERO(&fdlist);
if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
- if (lmgmt->ccom->receive_fd > 0) {
- FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */
- }
FD_SET(lmgmt->ccom->reliable_server_fd, &fdlist); /* TCP Server fd */
}
mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
- if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
- // Multicast timeout considerations
- if ((lmgmt->ccom->receive_fd < 0) || !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) {
- t = time(NULL);
- if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
- // Timeout on multicast receive channel, reset channel.
- if (lmgmt->ccom->receive_fd > 0) {
- close(lmgmt->ccom->receive_fd);
- }
- lmgmt->ccom->receive_fd = -1;
- Debug("ccom", "Timeout, resetting multicast receive channel");
- if (lmgmt->ccom->establishReceiveChannel(0)) {
- Debug("ccom", "establishReceiveChannel failed");
- lmgmt->ccom->receive_fd = -1;
- }
- last_multicast_receive_time = t; // next action at next interval
- }
- } else {
- last_multicast_receive_time = time(NULL); // valid multicast msg
- }
- }
-
- /* Broadcast message */
- if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
- lmgmt->ccom->receive_fd > 0 &&
- FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
- (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
- lmgmt->ccom->handleMultiCastMessage(message);
- } else if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
+ if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
/* Reliable(TCP) request */
int clilen = sizeof(cli_addr);
int req_fd = mgmt_accept(lmgmt->ccom->reliable_server_fd, (struct sockaddr *) &cli_addr, &clilen);
@@ -442,8 +479,10 @@ ClusterCom::ClusterCom(unsigned long oip, char *host, int mcport, char *group, i
peers = ink_hash_table_create(InkHashTableKeyType_String);
mismatchLog = ink_hash_table_create(InkHashTableKeyType_String);
- if (cluster_type != NO_CLUSTER)
+ if (cluster_type != NO_CLUSTER) {
+ ink_thread_create(drainIncomingChannel_broadcast, 0); /* Spin drainer thread */
ink_thread_create(drainIncomingChannel, 0); /* Spin drainer thread */
+ }
return;
} /* End ClusterCom::ClusterCom */
Re: git commit: TS-2201: split drainIncomingChannel two thread,
one handle Broadcast message and other handle Reliable(TCP) request.
Posted by James Peach <jp...@apache.org>.
On Sep 17, 2013, at 4:09 AM, Igor Galić <i....@brainsware.org> wrote:
>
>
> ----- Original Message -----
>> Updated Branches:
>> refs/heads/master f47c6be30 -> 3a903f2bc
>>
>>
>> TS-2201: split drainIncomingChannel two thread, one handle Broadcast message
>> and other handle Reliable(TCP) request.
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b
>> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b
>> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b
>>
>> Branch: refs/heads/master
>> Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b
>> Parents: f47c6be
>> Author: Chen Bin <ku...@taobao.com>
>> Authored: Tue Sep 17 11:51:11 2013 +0800
>> Committer: Chen Bin <ku...@taobao.com>
>> Committed: Tue Sep 17 11:51:11 2013 +0800
>>
>> ----------------------------------------------------------------------
>> CHANGES | 3 ++
>> mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++-------------
>> 2 files changed, 77 insertions(+), 35 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES
>> ----------------------------------------------------------------------
>> diff --git a/CHANGES b/CHANGES
>> index e9c056d..f2e9fd4 100644
>> --- a/CHANGES
>> +++ b/CHANGES
>> @@ -1,6 +1,9 @@
>> -*- coding: utf-8
>> -*-
>> Changes with Apache Traffic Server 4.1.0
>>
>> + *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast
>> message and other handle Reliable(TCP)
>> + request for supporing large cluster.
>> +
>> *) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash
>> the process intermittently.
>>
>>
>> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc
>> ----------------------------------------------------------------------
>> diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc
>> index 430e01b..fa173cc 100644
>> --- a/mgmt/cluster/ClusterCom.cc
>> +++ b/mgmt/cluster/ClusterCom.cc
>> @@ -48,6 +48,76 @@
>> int MultiCastMessages = 0;
>> long LastHighestDelta = -1L;
>>
>> +
>> +void *
>> +drainIncomingChannel_broadcast(void *arg)
>> +{
>> + char message[61440];
>
> This number pops up
>
>> + fd_set fdlist;
>> + void *ret = arg;
>> +
>> + time_t t;
>> + time_t last_multicast_receive_time = time(NULL);
>> + struct timeval tv;
>> +
>> + /* Avert race condition, thread spun during constructor */
>> + while (!lmgmt->ccom || !lmgmt->ccom->init) {
>> + mgmt_sleep_sec(1);
>> + }
>> +
>> + lmgmt->syslogThrInit();
>> +
>> + for (;;) { /* Loop draining mgmt network channels */
>> + // linux: set tv.tv_set in select() loop, since linux's select()
>> + // will update tv with the amount of time not slept (most other
>> + // implementations do not do this)
>> + tv.tv_sec = lmgmt->ccom->mc_poll_timeout; // interface
>> not-responding timeout
>> + tv.tv_usec = 0;
>> +
>> + memset(message, 0, 61440);
>
> numerous times. You may want to
>
>> + FD_ZERO(&fdlist);
>> +
>> + if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
>> + if (lmgmt->ccom->receive_fd > 0) {
>> + FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */
>> + }
>> + }
>> +
>> + mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
>> +
>> + if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
>> + // Multicast timeout considerations
>> + if ((lmgmt->ccom->receive_fd < 0) ||
>> !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) {
>> + t = time(NULL);
>> + if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
>> + // Timeout on multicast receive channel, reset channel.
>> + if (lmgmt->ccom->receive_fd > 0) {
>> + close(lmgmt->ccom->receive_fd);
>> + }
>> + lmgmt->ccom->receive_fd = -1;
>> + Debug("ccom", "Timeout, resetting multicast receive channel");
>> + if (lmgmt->ccom->establishReceiveChannel(0)) {
>> + Debug("ccom", "establishReceiveChannel failed");
>> + lmgmt->ccom->receive_fd = -1;
>> + }
>> + last_multicast_receive_time = t; // next action at next
>> interval
>> + }
>> + } else {
>> + last_multicast_receive_time = time(NULL); // valid multicast
>> msg
>> + }
>> + }
>> +
>> + /* Broadcast message */
>> + if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
>> + lmgmt->ccom->receive_fd > 0 &&
>> + FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
>> + (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
>
>
> Consider replacing it with a constant.
or at lease use sizeof() a lot ...
J
Re: git commit: TS-2201: split drainIncomingChannel two thread, one
handle Broadcast message and other handle Reliable(TCP) request.
Posted by Igor Galić <i....@brainsware.org>.
----- Original Message -----
> Updated Branches:
> refs/heads/master f47c6be30 -> 3a903f2bc
>
>
> TS-2201: split drainIncomingChannel two thread, one handle Broadcast message
> and other handle Reliable(TCP) request.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3a903f2b
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3a903f2b
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3a903f2b
>
> Branch: refs/heads/master
> Commit: 3a903f2bc69c0fb5623fa212f2f1ef64b5455c8b
> Parents: f47c6be
> Author: Chen Bin <ku...@taobao.com>
> Authored: Tue Sep 17 11:51:11 2013 +0800
> Committer: Chen Bin <ku...@taobao.com>
> Committed: Tue Sep 17 11:51:11 2013 +0800
>
> ----------------------------------------------------------------------
> CHANGES | 3 ++
> mgmt/cluster/ClusterCom.cc | 109 +++++++++++++++++++++++++++-------------
> 2 files changed, 77 insertions(+), 35 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/CHANGES
> ----------------------------------------------------------------------
> diff --git a/CHANGES b/CHANGES
> index e9c056d..f2e9fd4 100644
> --- a/CHANGES
> +++ b/CHANGES
> @@ -1,6 +1,9 @@
> -*- coding: utf-8
> -*-
> Changes with Apache Traffic Server 4.1.0
>
> + *) [TS-2201] split drainIncomingChannel two thread, one handle Broadcast
> message and other handle Reliable(TCP)
> + request for supporing large cluster.
> +
> *) [TS-2144] Avoid race on e.g. "traffic_server -Cclear" which would crash
> the process intermittently.
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3a903f2b/mgmt/cluster/ClusterCom.cc
> ----------------------------------------------------------------------
> diff --git a/mgmt/cluster/ClusterCom.cc b/mgmt/cluster/ClusterCom.cc
> index 430e01b..fa173cc 100644
> --- a/mgmt/cluster/ClusterCom.cc
> +++ b/mgmt/cluster/ClusterCom.cc
> @@ -48,6 +48,76 @@
> int MultiCastMessages = 0;
> long LastHighestDelta = -1L;
>
> +
> +void *
> +drainIncomingChannel_broadcast(void *arg)
> +{
> + char message[61440];
This number pops up
> + fd_set fdlist;
> + void *ret = arg;
> +
> + time_t t;
> + time_t last_multicast_receive_time = time(NULL);
> + struct timeval tv;
> +
> + /* Avert race condition, thread spun during constructor */
> + while (!lmgmt->ccom || !lmgmt->ccom->init) {
> + mgmt_sleep_sec(1);
> + }
> +
> + lmgmt->syslogThrInit();
> +
> + for (;;) { /* Loop draining mgmt network channels */
> + // linux: set tv.tv_set in select() loop, since linux's select()
> + // will update tv with the amount of time not slept (most other
> + // implementations do not do this)
> + tv.tv_sec = lmgmt->ccom->mc_poll_timeout; // interface
> not-responding timeout
> + tv.tv_usec = 0;
> +
> + memset(message, 0, 61440);
numerous times. You may want to
> + FD_ZERO(&fdlist);
> +
> + if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
> + if (lmgmt->ccom->receive_fd > 0) {
> + FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */
> + }
> + }
> +
> + mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
> +
> + if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
> + // Multicast timeout considerations
> + if ((lmgmt->ccom->receive_fd < 0) ||
> !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) {
> + t = time(NULL);
> + if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
> + // Timeout on multicast receive channel, reset channel.
> + if (lmgmt->ccom->receive_fd > 0) {
> + close(lmgmt->ccom->receive_fd);
> + }
> + lmgmt->ccom->receive_fd = -1;
> + Debug("ccom", "Timeout, resetting multicast receive channel");
> + if (lmgmt->ccom->establishReceiveChannel(0)) {
> + Debug("ccom", "establishReceiveChannel failed");
> + lmgmt->ccom->receive_fd = -1;
> + }
> + last_multicast_receive_time = t; // next action at next
> interval
> + }
> + } else {
> + last_multicast_receive_time = time(NULL); // valid multicast
> msg
> + }
> + }
> +
> + /* Broadcast message */
> + if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
> + lmgmt->ccom->receive_fd > 0 &&
> + FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
> + (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
Consider replacing it with a constant.
> + lmgmt->ccom->handleMultiCastMessage(message);
> + }
> + }
> + return ret;
> +} /* End drainIncomingChannel */
> +
> /*
> * drainIncomingChannel
> * This function is blocking, it never returns. It is meant to allow for
> @@ -89,8 +159,6 @@ drainIncomingChannel(void *arg)
> // to reopen the channel (e.g. opening the socket would fail if the
> // interface was down). In this case, the ccom->receive_fd is set
> // to '-1' and the open is retried until it succeeds.
> - time_t t;
> - time_t last_multicast_receive_time = time(NULL);
> struct timeval tv;
>
> /* Avert race condition, thread spun during constructor */
> @@ -111,43 +179,12 @@ drainIncomingChannel(void *arg)
> FD_ZERO(&fdlist);
>
> if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
> - if (lmgmt->ccom->receive_fd > 0) {
> - FD_SET(lmgmt->ccom->receive_fd, &fdlist); /* Multicast fd */
> - }
> FD_SET(lmgmt->ccom->reliable_server_fd, &fdlist); /* TCP Server fd
> */
> }
>
> mgmt_select(FD_SETSIZE, &fdlist, NULL, NULL, &tv);
>
> - if (lmgmt->ccom->cluster_type != NO_CLUSTER) {
> - // Multicast timeout considerations
> - if ((lmgmt->ccom->receive_fd < 0) ||
> !FD_ISSET(lmgmt->ccom->receive_fd, &fdlist)) {
> - t = time(NULL);
> - if ((t - last_multicast_receive_time) > (tv.tv_sec - 1)) {
> - // Timeout on multicast receive channel, reset channel.
> - if (lmgmt->ccom->receive_fd > 0) {
> - close(lmgmt->ccom->receive_fd);
> - }
> - lmgmt->ccom->receive_fd = -1;
> - Debug("ccom", "Timeout, resetting multicast receive channel");
> - if (lmgmt->ccom->establishReceiveChannel(0)) {
> - Debug("ccom", "establishReceiveChannel failed");
> - lmgmt->ccom->receive_fd = -1;
> - }
> - last_multicast_receive_time = t; // next action at next
> interval
> - }
> - } else {
> - last_multicast_receive_time = time(NULL); // valid multicast
> msg
> - }
> - }
> -
> - /* Broadcast message */
> - if (lmgmt->ccom->cluster_type != NO_CLUSTER &&
> - lmgmt->ccom->receive_fd > 0 &&
> - FD_ISSET(lmgmt->ccom->receive_fd, &fdlist) &&
> - (lmgmt->ccom->receiveIncomingMessage(message, 61440) > 0)) {
> - lmgmt->ccom->handleMultiCastMessage(message);
> - } else if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
> + if (FD_ISSET(lmgmt->ccom->reliable_server_fd, &fdlist)) {
> /* Reliable(TCP) request */
> int clilen = sizeof(cli_addr);
> int req_fd = mgmt_accept(lmgmt->ccom->reliable_server_fd, (struct
> sockaddr *) &cli_addr, &clilen);
> @@ -442,8 +479,10 @@ ClusterCom::ClusterCom(unsigned long oip, char *host,
> int mcport, char *group, i
> peers = ink_hash_table_create(InkHashTableKeyType_String);
> mismatchLog = ink_hash_table_create(InkHashTableKeyType_String);
>
> - if (cluster_type != NO_CLUSTER)
> + if (cluster_type != NO_CLUSTER) {
> + ink_thread_create(drainIncomingChannel_broadcast, 0); /* Spin drainer
> thread */
> ink_thread_create(drainIncomingChannel, 0); /* Spin drainer thread */
> + }
> return;
> } /* End ClusterCom::ClusterCom */
>
>
>
--
Igor Galić
Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE