You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by yu...@apache.org on 2013/08/21 11:43:28 UTC
[1/3] git commit: TS-2137: Use eventfd instread of pthread
signal/wait in ATS
Updated Branches:
refs/heads/master b30d69825 -> 5f4b470ce
TS-2137: Use eventfd instread of pthread signal/wait in ATS
pthread_cond_signal/wait is used in several places in ATS, including but
not limited:
1) Logging system.
2) ProtectedQueue in event system.
3) RecProcess in stats system.
As we known, pthread_cond_signal() need to take lock, it'll cause
more context switch than eventfd.
In my testing:
1) Using _signal()/_wait() pair, eventfd is about 5 times faster than
pthread cond.
2) Using _signal()/_timedwait() pair, eventfd is about 2 times faster
than pthread cond.
== NOTE ==
pthread_cond_signal/wait is also used by AIO module, but we can't
simply replace it with eventfd, as AIO code use the mutex to protect
other stuff. And I found we can't replace it in ProtectedQueue directly,
there are some prepare work to do.
Signed-off-by: Yunkai Zhang <qi...@taobao.com>
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f4f8d99f
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f4f8d99f
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f4f8d99f
Branch: refs/heads/master
Commit: f4f8d99f65b7ffbc37e7f72adc0694adb26041c4
Parents: b30d698
Author: Yunkai Zhang <qi...@taobao.com>
Authored: Sat Aug 17 16:17:41 2013 +0800
Committer: Yunkai Zhang <qi...@taobao.com>
Committed: Wed Aug 21 17:39:53 2013 +0800
----------------------------------------------------------------------
lib/records/RecProcess.cc | 17 ++-
lib/ts/EventNotify.cc | 172 +++++++++++++++++++++++++++++++
lib/ts/EventNotify.h | 54 ++++++++++
lib/ts/Makefile.am | 2 +
lib/ts/libts.h | 1 +
proxy/logging/Log.cc | 47 +++------
proxy/logging/Log.h | 9 +-
proxy/logging/LogCollationHostSM.cc | 2 +-
proxy/logging/LogConfig.cc | 2 +-
proxy/logging/LogFile.cc | 4 +-
proxy/logging/LogObject.cc | 2 +-
11 files changed, 261 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/records/RecProcess.cc
----------------------------------------------------------------------
diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc
index b9c69ba..3f1f34e 100644
--- a/lib/records/RecProcess.cc
+++ b/lib/records/RecProcess.cc
@@ -35,8 +35,7 @@
static bool g_initialized = false;
static bool g_message_initialized = false;
static bool g_started = false;
-static ink_cond g_force_req_cond;
-static ink_mutex g_force_req_mutex;
+static EventNotify g_force_req_notify;
static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
@@ -263,9 +262,9 @@ recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
if (msg_type == RECG_PULL_ACK) {
- ink_mutex_acquire(&g_force_req_mutex);
- ink_cond_signal(&g_force_req_cond);
- ink_mutex_release(&g_force_req_mutex);
+ g_force_req_notify.lock();
+ g_force_req_notify.signal();
+ g_force_req_notify.unlock();
}
}
return err;
@@ -419,13 +418,11 @@ RecProcessInitMessage(RecModeT mode_type)
return REC_ERR_FAIL;
}
- ink_cond_init(&g_force_req_cond);
- ink_mutex_init(&g_force_req_mutex, NULL);
if (mode_type == RECM_CLIENT) {
send_pull_message(RECG_PULL_REQ);
- ink_mutex_acquire(&g_force_req_mutex);
- ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
- ink_mutex_release(&g_force_req_mutex);
+ g_force_req_notify.lock();
+ g_force_req_notify.wait();
+ g_force_req_notify.unlock();
}
g_message_initialized = true;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.cc
----------------------------------------------------------------------
diff --git a/lib/ts/EventNotify.cc b/lib/ts/EventNotify.cc
new file mode 100644
index 0000000..5d0cc53
--- /dev/null
+++ b/lib/ts/EventNotify.cc
@@ -0,0 +1,172 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+/**************************************************************************
+ EventNotify.cc
+
+ Generic event notify mechanism among threads.
+**************************************************************************/
+
+#include "EventNotify.h"
+#include "ink_hrtime.h"
+
+#ifdef TS_HAS_EVENTFD
+#include <sys/eventfd.h>
+#include <sys/epoll.h>
+#endif
+
+EventNotify::EventNotify(const char *name): m_name(name)
+{
+#ifdef TS_HAS_EVENTFD
+ int ret;
+ struct epoll_event ev;
+
+ // Don't use noblock here!
+ m_event_fd = eventfd(0, EFD_CLOEXEC);
+ if (m_event_fd < 0) {
+ // EFD_CLOEXEC invalid in <= Linux 2.6.27
+ m_event_fd = eventfd(0, 0);
+ }
+ ink_release_assert(m_event_fd != -1);
+
+ ev.events = EPOLLIN;
+ ev.data.fd = m_event_fd;
+
+ m_epoll_fd = epoll_create(1);
+ ink_release_assert(m_epoll_fd != -1);
+
+ ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_event_fd, &ev);
+ ink_release_assert(ret != -1);
+#else
+ ink_cond_init(&m_cond);
+ ink_mutex_init(&m_mutex, m_name);
+#endif
+}
+
+void
+EventNotify::signal(void)
+{
+#ifdef TS_HAS_EVENTFD
+ ssize_t nr;
+ uint64_t value = 1;
+ nr = write(m_event_fd, &value, sizeof(uint64_t));
+ ink_release_assert(nr == sizeof(uint64_t));
+#else
+ ink_cond_signal(&m_cond);
+#endif
+}
+
+void
+EventNotify::wait(void)
+{
+#ifdef TS_HAS_EVENTFD
+ ssize_t nr;
+ uint64_t value = 0;
+ nr = read(m_event_fd, &value, sizeof(uint64_t));
+ ink_release_assert(nr == sizeof(uint64_t));
+#else
+ ink_cond_wait(&m_cond, &m_mutex);
+#endif
+}
+
+int
+EventNotify::timedwait(ink_timestruc *abstime)
+{
+#ifdef TS_HAS_EVENTFD
+ int timeout;
+ ssize_t nr, nr_fd = 0;
+ uint64_t value = 0;
+ struct timeval curtime;
+ struct epoll_event ev;
+
+ // Convert absolute time to relative time
+ gettimeofday(&curtime, NULL);
+ timeout = (abstime->tv_sec - curtime.tv_sec) * 1000
+ + (abstime->tv_nsec / 1000 - curtime.tv_usec) / 1000;
+
+ //
+ // When timeout < 0, epoll_wait() will wait indefinitely, but
+ // pthread_cond_timedwait() will return ETIMEDOUT immediately.
+ // We should keep compatible with pthread_cond_timedwait() here.
+ //
+ if (timeout < 0)
+ return ETIMEDOUT;
+
+ do {
+ nr_fd = epoll_wait(m_epoll_fd, &ev, 1, timeout);
+ } while (nr_fd == -1 && errno == EINTR);
+
+ if (nr_fd == 0)
+ return ETIMEDOUT;
+ else if (nr_fd == -1)
+ return errno;
+
+ nr = read(m_event_fd, &value, sizeof(uint64_t));
+ ink_release_assert(nr == sizeof(uint64_t));
+
+ return 0;
+#else
+ return ink_cond_timedwait(&m_cond, &m_mutex, abstime);
+#endif
+}
+
+void
+EventNotify::lock(void)
+{
+#ifdef TS_HAS_EVENTFD
+ // do nothing
+#else
+ ink_mutex_acquire(&m_mutex);
+#endif
+}
+
+bool
+EventNotify::trylock(void)
+{
+#ifdef TS_HAS_EVENTFD
+ return true;
+#else
+ return ink_mutex_try_acquire(&m_mutex);
+#endif
+}
+
+void
+EventNotify::unlock(void)
+{
+#ifdef TS_HAS_EVENTFD
+ // do nothing
+#else
+ ink_mutex_release(&m_mutex);
+#endif
+}
+
+EventNotify::~EventNotify()
+{
+#ifdef TS_HAS_EVENTFD
+ close(m_event_fd);
+ close(m_epoll_fd);
+#else
+ ink_cond_destroy(&m_cond);
+ ink_mutex_destroy(&m_mutex);
+#endif
+}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.h
----------------------------------------------------------------------
diff --git a/lib/ts/EventNotify.h b/lib/ts/EventNotify.h
new file mode 100644
index 0000000..16e4809
--- /dev/null
+++ b/lib/ts/EventNotify.h
@@ -0,0 +1,54 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+/**************************************************************************
+ EventNotify.h
+
+ Generic event notify mechanism among threads.
+
+**************************************************************************/
+
+#include "ink_thread.h"
+
+class EventNotify
+{
+public:
+ EventNotify(const char *name = NULL);
+ void signal(void);
+ void wait(void);
+ int timedwait(ink_timestruc *abstime);
+ void lock(void);
+ bool trylock(void);
+ void unlock(void);
+ ~EventNotify();
+
+private:
+ const char *m_name;
+#ifdef TS_HAS_EVENTFD
+ int m_event_fd;
+ int m_epoll_fd;
+#else
+ ink_cond m_cond;
+ ink_mutex m_mutex;
+#endif
+};
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/Makefile.am
----------------------------------------------------------------------
diff --git a/lib/ts/Makefile.am b/lib/ts/Makefile.am
index c360a4b..3fca2f8 100644
--- a/lib/ts/Makefile.am
+++ b/lib/ts/Makefile.am
@@ -117,6 +117,8 @@ libtsutil_la_SOURCES = \
ink_syslog.h \
ink_thread.cc \
ink_thread.h \
+ EventNotify.h \
+ EventNotify.cc \
ink_time.cc \
ink_time.h \
inktomi++.h \
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/libts.h
----------------------------------------------------------------------
diff --git a/lib/ts/libts.h b/lib/ts/libts.h
index 046afaa..27c9e92 100644
--- a/lib/ts/libts.h
+++ b/lib/ts/libts.h
@@ -81,6 +81,7 @@
#include "Bitops.h"
#include "Compatability.h"
#include "DynArray.h"
+#include "EventNotify.h"
#include "I_Version.h"
#include "InkPool.h"
#include "List.h"
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
index 502c424..790bc04 100644
--- a/proxy/logging/Log.cc
+++ b/proxy/logging/Log.cc
@@ -77,15 +77,12 @@ size_t Log::numInactiveObjects;
size_t Log::maxInactiveObjects;
// Flush thread stuff
-ink_mutex *Log::preproc_mutex;
-ink_cond *Log::preproc_cond;
-ink_mutex *Log::flush_mutex;
-ink_cond *Log::flush_cond;
+EventNotify *Log::preproc_notify;
+EventNotify *Log::flush_notify;
InkAtomicList *Log::flush_data_list;
// Collate thread stuff
-ink_mutex Log::collate_mutex;
-ink_cond Log::collate_cond;
+EventNotify Log::collate_notify;
ink_thread Log::collate_thread;
int Log::collation_accept_file_descriptor;
int Log::collation_preproc_threads;
@@ -189,10 +186,10 @@ struct PeriodicWakeup : Continuation
int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
for (int i = 0; i < m_preproc_threads; i++) {
- ink_cond_signal (&Log::preproc_cond[i]);
+ Log::preproc_notify[i].signal();
}
for (int i = 0; i < m_flush_threads; i++) {
- ink_cond_signal (&Log::flush_cond[i]);
+ Log::flush_notify[i].signal();
}
return EVENT_CONT;
}
@@ -1070,8 +1067,7 @@ Log::create_threads()
if (!(init_status & THREADS_CREATED)) {
char desc[64];
- preproc_mutex = new ink_mutex[collation_preproc_threads];
- preproc_cond = new ink_cond[collation_preproc_threads];
+ preproc_notify = new EventNotify[collation_preproc_threads];
size_t stacksize;
REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
@@ -1081,9 +1077,6 @@ Log::create_threads()
// no need for the conditional var since it will be relying on
// on the event system.
for (int i = 0; i < collation_preproc_threads; i++) {
- sprintf(desc, "Logging preproc thread mutex[%d]", i);
- ink_mutex_init(&preproc_mutex[i], desc);
- ink_cond_init(&preproc_cond[i]);
Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
sprintf(desc, "[LOG_PREPROC %d]", i);
eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
@@ -1093,13 +1086,9 @@ Log::create_threads()
// TODO: Enable multiple flush threads, such as
// one flush thread per file.
//
- flush_mutex = new ink_mutex;
- flush_cond = new ink_cond;
+ flush_notify = new EventNotify;
flush_data_list = new InkAtomicList;
- sprintf(desc, "Logging flush thread mutex");
- ink_mutex_init(flush_mutex, desc);
- ink_cond_init(flush_cond);
sprintf(desc, "Logging flush buffer list");
ink_atomiclist_init(flush_data_list, desc, 0);
Continuation *flush_cont = NEW(new LoggingFlushContinuation(0));
@@ -1118,8 +1107,6 @@ Log::create_threads()
// much overhead associated with keeping an ink_thread blocked on a
// condition variable.
//
- ink_mutex_init(&collate_mutex, "Collate thread mutex");
- ink_cond_init(&collate_cond);
Continuation *collate_continuation = NEW(new LoggingCollateContinuation);
Event *collate_event = eventProcessor.spawn_thread(collate_continuation);
collate_thread = collate_event->ethread->tid;
@@ -1246,7 +1233,7 @@ Log::preproc_thread_main(void *args)
Debug("log-preproc", "log preproc thread is alive ...");
- ink_mutex_acquire(&preproc_mutex[idx]);
+ Log::preproc_notify[idx].lock();
while (true) {
buffers_preproced = config->log_object_manager.preproc_buffers(idx);
@@ -1264,11 +1251,11 @@ Log::preproc_thread_main(void *args)
// check the queue and find there is nothing to do, then wait
// again.
//
- ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]);
+ Log::preproc_notify[idx].wait();
}
/* NOTREACHED */
- ink_mutex_release(&preproc_mutex[idx]);
+ Log::preproc_notify[idx].unlock();
return NULL;
}
@@ -1283,7 +1270,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
int len, bytes_written, total_bytes;
SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
- ink_mutex_acquire(flush_mutex);
+ Log::flush_notify->lock();
while (true) {
fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
@@ -1369,11 +1356,11 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
// check the queue and find there is nothing to do, then wait
// again.
//
- ink_cond_wait(flush_cond, flush_mutex);
+ Log::flush_notify->wait();
}
/* NOTREACHED */
- ink_mutex_release(flush_mutex);
+ Log::flush_notify->unlock();
return NULL;
}
@@ -1398,7 +1385,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
Debug("log-thread", "Log collation thread is alive ...");
- ink_mutex_acquire(&collate_mutex);
+ Log::collate_notify.lock();
while (true) {
ink_assert(Log::config != NULL);
@@ -1408,7 +1395,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
// wake-ups.
//
while (!Log::config->am_collation_host()) {
- ink_cond_wait(&collate_cond, &collate_mutex);
+ Log::collate_notify.wait();
}
// Ok, at this point we know we're a log collation host, so get to
@@ -1427,7 +1414,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
//
// go to sleep ...
//
- ink_cond_wait(&collate_cond, &collate_mutex);
+ Log::collate_notify.wait();
continue;
}
@@ -1489,7 +1476,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
}
/* NOTREACHED */
- ink_mutex_release(&collate_mutex);
+ Log::collate_notify.unlock();
return NULL;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.h
----------------------------------------------------------------------
diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
index 2f395ca..2233220 100644
--- a/proxy/logging/Log.h
+++ b/proxy/logging/Log.h
@@ -420,17 +420,14 @@ public:
static void add_to_inactive(LogObject * obj);
// logging thread stuff
- static ink_mutex *preproc_mutex;
- static ink_cond *preproc_cond;
+ static EventNotify *preproc_notify;
static void *preproc_thread_main(void *args);
- static ink_mutex *flush_mutex;
- static ink_cond *flush_cond;
+ static EventNotify *flush_notify;
static InkAtomicList *flush_data_list;
static void *flush_thread_main(void *args);
// collation thread stuff
- static ink_mutex collate_mutex;
- static ink_cond collate_cond;
+ static EventNotify collate_notify;
static ink_thread collate_thread;
static int collation_preproc_threads;
static int collation_accept_file_descriptor;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogCollationHostSM.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogCollationHostSM.cc b/proxy/logging/LogCollationHostSM.cc
index 9add290..36452f2 100644
--- a/proxy/logging/LogCollationHostSM.cc
+++ b/proxy/logging/LogCollationHostSM.cc
@@ -321,7 +321,7 @@ LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */)
//
log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
int idx = log_object->add_to_flush_queue(log_buffer);
- ink_cond_signal(&Log::preproc_cond[idx]);
+ Log::preproc_notify[idx].signal();
}
#if defined(LOG_BUFFER_TRACKING)
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogConfig.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
index 6cf55ba..8e3b585 100644
--- a/proxy/logging/LogConfig.cc
+++ b/proxy/logging/LogConfig.cc
@@ -661,7 +661,7 @@ LogConfig::setup_collation(LogConfig * prev_config)
// since we are the collation host, we need to signal the
// collate_cond variable so that our collation thread wakes up.
//
- ink_cond_signal(&Log::collate_cond);
+ Log::collate_notify.signal();
#endif
Debug("log", "I am a collation host listening on port %d.", collation_port);
} else {
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogFile.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
index 5bf6ba4..6b93a26 100644
--- a/proxy/logging/LogFile.cc
+++ b/proxy/logging/LogFile.cc
@@ -523,7 +523,7 @@ LogFile::preproc_and_try_delete(LogBuffer * lb)
ink_atomiclist_push(Log::flush_data_list, flush_data);
- ink_cond_signal(Log::flush_cond);
+ Log::flush_notify->signal();
//
// LogBuffer will be deleted in flush thread
@@ -693,7 +693,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
LogFlushData *flush_data = new LogFlushData(this, ascii_buffer, fmt_buf_bytes);
ink_atomiclist_push(Log::flush_data_list, flush_data);
- ink_cond_signal(Log::flush_cond);
+ Log::flush_notify->signal();
total_bytes += fmt_buf_bytes;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogObject.cc
----------------------------------------------------------------------
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index ea059e2..29f1a39 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -461,7 +461,7 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
int idx = m_buffer_manager_idx++ % m_flush_threads;
Debug("log-logbuffer", "adding buffer %d to flush list after checkout", buffer->get_id());
m_buffer_manager[idx].add_to_flush_queue(buffer);
- ink_cond_signal(&Log::preproc_cond[idx]);
+ Log::preproc_notify[idx].signal();
}
decremented = true;
Re: [1/3] git commit: TS-2137: Use eventfd instread of pthread
signal/wait in ATS
Posted by Yunkai Zhang <yu...@gmail.com>.
On Thu, Aug 22, 2013 at 2:22 AM, James Peach <jp...@apache.org> wrote:
> On Aug 21, 2013, at 2:43 AM, yunkai@apache.org wrote:
>
> > Updated Branches:
> > refs/heads/master b30d69825 -> 5f4b470ce
> >
> >
> > TS-2137: Use eventfd instread of pthread signal/wait in ATS
> >
> > pthread_cond_signal/wait is used in several places in ATS, including but
> > not limited:
> > 1) Logging system.
> > 2) ProtectedQueue in event system.
> > 3) RecProcess in stats system.
> >
> > As we known, pthread_cond_signal() need to take lock, it'll cause
> > more context switch than eventfd.
> >
> > In my testing:
> > 1) Using _signal()/_wait() pair, eventfd is about 5 times faster than
> > pthread cond.
> >
> > 2) Using _signal()/_timedwait() pair, eventfd is about 2 times faster
> > than pthread cond.
>
> This looks pretty good. I had a couple of comments ...
>
> - EventNotify::timedwait() should take a relative timeout, since
> this is almost always what you want to use. Since timedwait() is not
> currently used, consider removing it.
>
1) timedwait maybe used in other area, such as ProtectedQueue in the future.
> - You should also remove EventNotify::m_name, since that's never
> used or even set.
> - EventNotify.h needs header #include guards.
>
2) I had intended to make it as relative timeout(same reason as m_name),
but for keeping minimal changes, I gave up:(.
I 'll give a patch to: a) make abstime to relative time. b) remove m_name.
> >
> > == NOTE ==
> > pthread_cond_signal/wait is also used by AIO module, but we can't
> > simply replace it with eventfd, as AIO code use the mutex to protect
> > other stuff. And I found we can't replace it in ProtectedQueue directly,
> > there are some prepare work to do.
> >
> > Signed-off-by: Yunkai Zhang <qi...@taobao.com>
> >
> >
> > Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> > Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f4f8d99f
> > Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f4f8d99f
> > Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f4f8d99f
> >
> > Branch: refs/heads/master
> > Commit: f4f8d99f65b7ffbc37e7f72adc0694adb26041c4
> > Parents: b30d698
> > Author: Yunkai Zhang <qi...@taobao.com>
> > Authored: Sat Aug 17 16:17:41 2013 +0800
> > Committer: Yunkai Zhang <qi...@taobao.com>
> > Committed: Wed Aug 21 17:39:53 2013 +0800
> >
> > ----------------------------------------------------------------------
> > lib/records/RecProcess.cc | 17 ++-
> > lib/ts/EventNotify.cc | 172 +++++++++++++++++++++++++++++++
> > lib/ts/EventNotify.h | 54 ++++++++++
> > lib/ts/Makefile.am | 2 +
> > lib/ts/libts.h | 1 +
> > proxy/logging/Log.cc | 47 +++------
> > proxy/logging/Log.h | 9 +-
> > proxy/logging/LogCollationHostSM.cc | 2 +-
> > proxy/logging/LogConfig.cc | 2 +-
> > proxy/logging/LogFile.cc | 4 +-
> > proxy/logging/LogObject.cc | 2 +-
> > 11 files changed, 261 insertions(+), 51 deletions(-)
> > ----------------------------------------------------------------------
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/records/RecProcess.cc
> > ----------------------------------------------------------------------
> > diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc
> > index b9c69ba..3f1f34e 100644
> > --- a/lib/records/RecProcess.cc
> > +++ b/lib/records/RecProcess.cc
> > @@ -35,8 +35,7 @@
> > static bool g_initialized = false;
> > static bool g_message_initialized = false;
> > static bool g_started = false;
> > -static ink_cond g_force_req_cond;
> > -static ink_mutex g_force_req_mutex;
> > +static EventNotify g_force_req_notify;
> > static int g_rec_raw_stat_sync_interval_ms =
> REC_RAW_STAT_SYNC_INTERVAL_MS;
> > static int g_rec_config_update_interval_ms =
> REC_CONFIG_UPDATE_INTERVAL_MS;
> > static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
> > @@ -263,9 +262,9 @@ recv_message_cb__process(RecMessage *msg,
> RecMessageT msg_type, void *cookie)
> >
> > if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
> > if (msg_type == RECG_PULL_ACK) {
> > - ink_mutex_acquire(&g_force_req_mutex);
> > - ink_cond_signal(&g_force_req_cond);
> > - ink_mutex_release(&g_force_req_mutex);
> > + g_force_req_notify.lock();
> > + g_force_req_notify.signal();
> > + g_force_req_notify.unlock();
> > }
> > }
> > return err;
> > @@ -419,13 +418,11 @@ RecProcessInitMessage(RecModeT mode_type)
> > return REC_ERR_FAIL;
> > }
> >
> > - ink_cond_init(&g_force_req_cond);
> > - ink_mutex_init(&g_force_req_mutex, NULL);
> > if (mode_type == RECM_CLIENT) {
> > send_pull_message(RECG_PULL_REQ);
> > - ink_mutex_acquire(&g_force_req_mutex);
> > - ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
> > - ink_mutex_release(&g_force_req_mutex);
> > + g_force_req_notify.lock();
> > + g_force_req_notify.wait();
> > + g_force_req_notify.unlock();
> > }
> >
> > g_message_initialized = true;
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.cc
> > ----------------------------------------------------------------------
> > diff --git a/lib/ts/EventNotify.cc b/lib/ts/EventNotify.cc
> > new file mode 100644
> > index 0000000..5d0cc53
> > --- /dev/null
> > +++ b/lib/ts/EventNotify.cc
> > @@ -0,0 +1,172 @@
> > +/** @file
> > +
> > + A brief file description
> > +
> > + @section license License
> > +
> > + Licensed to the Apache Software Foundation (ASF) under one
> > + or more contributor license agreements. See the NOTICE file
> > + distributed with this work for additional information
> > + regarding copyright ownership. The ASF licenses this file
> > + to you under the Apache License, Version 2.0 (the
> > + "License"); you may not use this file except in compliance
> > + with the License. You may obtain a copy of the License at
> > +
> > + http://www.apache.org/licenses/LICENSE-2.0
> > +
> > + Unless required by applicable law or agreed to in writing, software
> > + distributed under the License is distributed on an "AS IS" BASIS,
> > + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + See the License for the specific language governing permissions and
> > + limitations under the License.
> > + */
> > +
> >
> +/**************************************************************************
> > + EventNotify.cc
> > +
> > + Generic event notify mechanism among threads.
> >
> +**************************************************************************/
> > +
> > +#include "EventNotify.h"
> > +#include "ink_hrtime.h"
> > +
> > +#ifdef TS_HAS_EVENTFD
> > +#include <sys/eventfd.h>
> > +#include <sys/epoll.h>
> > +#endif
> > +
> > +EventNotify::EventNotify(const char *name): m_name(name)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + int ret;
> > + struct epoll_event ev;
> > +
> > + // Don't use noblock here!
> > + m_event_fd = eventfd(0, EFD_CLOEXEC);
> > + if (m_event_fd < 0) {
> > + // EFD_CLOEXEC invalid in <= Linux 2.6.27
> > + m_event_fd = eventfd(0, 0);
> > + }
> > + ink_release_assert(m_event_fd != -1);
> > +
> > + ev.events = EPOLLIN;
> > + ev.data.fd = m_event_fd;
> > +
> > + m_epoll_fd = epoll_create(1);
> > + ink_release_assert(m_epoll_fd != -1);
> > +
> > + ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_event_fd, &ev);
> > + ink_release_assert(ret != -1);
> > +#else
> > + ink_cond_init(&m_cond);
> > + ink_mutex_init(&m_mutex, m_name);
> > +#endif
> > +}
> > +
> > +void
> > +EventNotify::signal(void)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + ssize_t nr;
> > + uint64_t value = 1;
> > + nr = write(m_event_fd, &value, sizeof(uint64_t));
> > + ink_release_assert(nr == sizeof(uint64_t));
> > +#else
> > + ink_cond_signal(&m_cond);
> > +#endif
> > +}
> > +
> > +void
> > +EventNotify::wait(void)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + ssize_t nr;
> > + uint64_t value = 0;
> > + nr = read(m_event_fd, &value, sizeof(uint64_t));
> > + ink_release_assert(nr == sizeof(uint64_t));
> > +#else
> > + ink_cond_wait(&m_cond, &m_mutex);
> > +#endif
> > +}
> > +
> > +int
> > +EventNotify::timedwait(ink_timestruc *abstime)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + int timeout;
> > + ssize_t nr, nr_fd = 0;
> > + uint64_t value = 0;
> > + struct timeval curtime;
> > + struct epoll_event ev;
> > +
> > + // Convert absolute time to relative time
> > + gettimeofday(&curtime, NULL);
> > + timeout = (abstime->tv_sec - curtime.tv_sec) * 1000
> > + + (abstime->tv_nsec / 1000 - curtime.tv_usec) / 1000;
> > +
> > + //
> > + // When timeout < 0, epoll_wait() will wait indefinitely, but
> > + // pthread_cond_timedwait() will return ETIMEDOUT immediately.
> > + // We should keep compatible with pthread_cond_timedwait() here.
> > + //
> > + if (timeout < 0)
> > + return ETIMEDOUT;
> > +
> > + do {
> > + nr_fd = epoll_wait(m_epoll_fd, &ev, 1, timeout);
> > + } while (nr_fd == -1 && errno == EINTR);
> > +
> > + if (nr_fd == 0)
> > + return ETIMEDOUT;
> > + else if (nr_fd == -1)
> > + return errno;
> > +
> > + nr = read(m_event_fd, &value, sizeof(uint64_t));
> > + ink_release_assert(nr == sizeof(uint64_t));
> > +
> > + return 0;
> > +#else
> > + return ink_cond_timedwait(&m_cond, &m_mutex, abstime);
> > +#endif
> > +}
> > +
> > +void
> > +EventNotify::lock(void)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + // do nothing
> > +#else
> > + ink_mutex_acquire(&m_mutex);
> > +#endif
> > +}
> > +
> > +bool
> > +EventNotify::trylock(void)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + return true;
> > +#else
> > + return ink_mutex_try_acquire(&m_mutex);
> > +#endif
> > +}
> > +
> > +void
> > +EventNotify::unlock(void)
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + // do nothing
> > +#else
> > + ink_mutex_release(&m_mutex);
> > +#endif
> > +}
> > +
> > +EventNotify::~EventNotify()
> > +{
> > +#ifdef TS_HAS_EVENTFD
> > + close(m_event_fd);
> > + close(m_epoll_fd);
> > +#else
> > + ink_cond_destroy(&m_cond);
> > + ink_mutex_destroy(&m_mutex);
> > +#endif
> > +}
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.h
> > ----------------------------------------------------------------------
> > diff --git a/lib/ts/EventNotify.h b/lib/ts/EventNotify.h
> > new file mode 100644
> > index 0000000..16e4809
> > --- /dev/null
> > +++ b/lib/ts/EventNotify.h
> > @@ -0,0 +1,54 @@
> > +/** @file
> > +
> > + A brief file description
> > +
> > + @section license License
> > +
> > + Licensed to the Apache Software Foundation (ASF) under one
> > + or more contributor license agreements. See the NOTICE file
> > + distributed with this work for additional information
> > + regarding copyright ownership. The ASF licenses this file
> > + to you under the Apache License, Version 2.0 (the
> > + "License"); you may not use this file except in compliance
> > + with the License. You may obtain a copy of the License at
> > +
> > + http://www.apache.org/licenses/LICENSE-2.0
> > +
> > + Unless required by applicable law or agreed to in writing, software
> > + distributed under the License is distributed on an "AS IS" BASIS,
> > + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + See the License for the specific language governing permissions and
> > + limitations under the License.
> > + */
> > +
> >
> +/**************************************************************************
> > + EventNotify.h
> > +
> > + Generic event notify mechanism among threads.
> > +
> >
> +**************************************************************************/
> > +
> > +#include "ink_thread.h"
> > +
> > +class EventNotify
> > +{
> > +public:
> > + EventNotify(const char *name = NULL);
> > + void signal(void);
> > + void wait(void);
> > + int timedwait(ink_timestruc *abstime);
> > + void lock(void);
> > + bool trylock(void);
> > + void unlock(void);
> > + ~EventNotify();
> > +
> > +private:
> > + const char *m_name;
> > +#ifdef TS_HAS_EVENTFD
> > + int m_event_fd;
> > + int m_epoll_fd;
> > +#else
> > + ink_cond m_cond;
> > + ink_mutex m_mutex;
> > +#endif
> > +};
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/Makefile.am
> > ----------------------------------------------------------------------
> > diff --git a/lib/ts/Makefile.am b/lib/ts/Makefile.am
> > index c360a4b..3fca2f8 100644
> > --- a/lib/ts/Makefile.am
> > +++ b/lib/ts/Makefile.am
> > @@ -117,6 +117,8 @@ libtsutil_la_SOURCES = \
> > ink_syslog.h \
> > ink_thread.cc \
> > ink_thread.h \
> > + EventNotify.h \
> > + EventNotify.cc \
> > ink_time.cc \
> > ink_time.h \
> > inktomi++.h \
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/libts.h
> > ----------------------------------------------------------------------
> > diff --git a/lib/ts/libts.h b/lib/ts/libts.h
> > index 046afaa..27c9e92 100644
> > --- a/lib/ts/libts.h
> > +++ b/lib/ts/libts.h
> > @@ -81,6 +81,7 @@
> > #include "Bitops.h"
> > #include "Compatability.h"
> > #include "DynArray.h"
> > +#include "EventNotify.h"
> > #include "I_Version.h"
> > #include "InkPool.h"
> > #include "List.h"
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> > index 502c424..790bc04 100644
> > --- a/proxy/logging/Log.cc
> > +++ b/proxy/logging/Log.cc
> > @@ -77,15 +77,12 @@ size_t Log::numInactiveObjects;
> > size_t Log::maxInactiveObjects;
> >
> > // Flush thread stuff
> > -ink_mutex *Log::preproc_mutex;
> > -ink_cond *Log::preproc_cond;
> > -ink_mutex *Log::flush_mutex;
> > -ink_cond *Log::flush_cond;
> > +EventNotify *Log::preproc_notify;
> > +EventNotify *Log::flush_notify;
> > InkAtomicList *Log::flush_data_list;
> >
> > // Collate thread stuff
> > -ink_mutex Log::collate_mutex;
> > -ink_cond Log::collate_cond;
> > +EventNotify Log::collate_notify;
> > ink_thread Log::collate_thread;
> > int Log::collation_accept_file_descriptor;
> > int Log::collation_preproc_threads;
> > @@ -189,10 +186,10 @@ struct PeriodicWakeup : Continuation
> > int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> > {
> > for (int i = 0; i < m_preproc_threads; i++) {
> > - ink_cond_signal (&Log::preproc_cond[i]);
> > + Log::preproc_notify[i].signal();
> > }
> > for (int i = 0; i < m_flush_threads; i++) {
> > - ink_cond_signal (&Log::flush_cond[i]);
> > + Log::flush_notify[i].signal();
> > }
> > return EVENT_CONT;
> > }
> > @@ -1070,8 +1067,7 @@ Log::create_threads()
> > if (!(init_status & THREADS_CREATED)) {
> >
> > char desc[64];
> > - preproc_mutex = new ink_mutex[collation_preproc_threads];
> > - preproc_cond = new ink_cond[collation_preproc_threads];
> > + preproc_notify = new EventNotify[collation_preproc_threads];
> >
> > size_t stacksize;
> > REC_ReadConfigInteger(stacksize,
> "proxy.config.thread.default.stacksize");
> > @@ -1081,9 +1077,6 @@ Log::create_threads()
> > // no need for the conditional var since it will be relying on
> > // on the event system.
> > for (int i = 0; i < collation_preproc_threads; i++) {
> > - sprintf(desc, "Logging preproc thread mutex[%d]", i);
> > - ink_mutex_init(&preproc_mutex[i], desc);
> > - ink_cond_init(&preproc_cond[i]);
> > Continuation *preproc_cont = NEW(new
> LoggingPreprocContinuation(i));
> > sprintf(desc, "[LOG_PREPROC %d]", i);
> > eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
> > @@ -1093,13 +1086,9 @@ Log::create_threads()
> > // TODO: Enable multiple flush threads, such as
> > // one flush thread per file.
> > //
> > - flush_mutex = new ink_mutex;
> > - flush_cond = new ink_cond;
> > + flush_notify = new EventNotify;
> > flush_data_list = new InkAtomicList;
> >
> > - sprintf(desc, "Logging flush thread mutex");
> > - ink_mutex_init(flush_mutex, desc);
> > - ink_cond_init(flush_cond);
> > sprintf(desc, "Logging flush buffer list");
> > ink_atomiclist_init(flush_data_list, desc, 0);
> > Continuation *flush_cont = NEW(new LoggingFlushContinuation(0));
> > @@ -1118,8 +1107,6 @@ Log::create_threads()
> > // much overhead associated with keeping an ink_thread blocked on a
> > // condition variable.
> > //
> > - ink_mutex_init(&collate_mutex, "Collate thread mutex");
> > - ink_cond_init(&collate_cond);
> > Continuation *collate_continuation = NEW(new
> LoggingCollateContinuation);
> > Event *collate_event =
> eventProcessor.spawn_thread(collate_continuation);
> > collate_thread = collate_event->ethread->tid;
> > @@ -1246,7 +1233,7 @@ Log::preproc_thread_main(void *args)
> >
> > Debug("log-preproc", "log preproc thread is alive ...");
> >
> > - ink_mutex_acquire(&preproc_mutex[idx]);
> > + Log::preproc_notify[idx].lock();
> >
> > while (true) {
> > buffers_preproced = config->log_object_manager.preproc_buffers(idx);
> > @@ -1264,11 +1251,11 @@ Log::preproc_thread_main(void *args)
> > // check the queue and find there is nothing to do, then wait
> > // again.
> > //
> > - ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]);
> > + Log::preproc_notify[idx].wait();
> > }
> >
> > /* NOTREACHED */
> > - ink_mutex_release(&preproc_mutex[idx]);
> > + Log::preproc_notify[idx].unlock();
> > return NULL;
> > }
> >
> > @@ -1283,7 +1270,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED
> */)
> > int len, bytes_written, total_bytes;
> > SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
> >
> > - ink_mutex_acquire(flush_mutex);
> > + Log::flush_notify->lock();
> >
> > while (true) {
> > fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
> > @@ -1369,11 +1356,11 @@ Log::flush_thread_main(void * /* args ATS_UNUSED
> */)
> > // check the queue and find there is nothing to do, then wait
> > // again.
> > //
> > - ink_cond_wait(flush_cond, flush_mutex);
> > + Log::flush_notify->wait();
> > }
> >
> > /* NOTREACHED */
> > - ink_mutex_release(flush_mutex);
> > + Log::flush_notify->unlock();
> > return NULL;
> > }
> >
> > @@ -1398,7 +1385,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED
> */)
> >
> > Debug("log-thread", "Log collation thread is alive ...");
> >
> > - ink_mutex_acquire(&collate_mutex);
> > + Log::collate_notify.lock();
> >
> > while (true) {
> > ink_assert(Log::config != NULL);
> > @@ -1408,7 +1395,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED
> */)
> > // wake-ups.
> > //
> > while (!Log::config->am_collation_host()) {
> > - ink_cond_wait(&collate_cond, &collate_mutex);
> > + Log::collate_notify.wait();
> > }
> >
> > // Ok, at this point we know we're a log collation host, so get to
> > @@ -1427,7 +1414,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED
> */)
> > //
> > // go to sleep ...
> > //
> > - ink_cond_wait(&collate_cond, &collate_mutex);
> > + Log::collate_notify.wait();
> > continue;
> > }
> >
> > @@ -1489,7 +1476,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED
> */)
> > }
> >
> > /* NOTREACHED */
> > - ink_mutex_release(&collate_mutex);
> > + Log::collate_notify.unlock();
> > return NULL;
> > }
> >
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.h
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
> > index 2f395ca..2233220 100644
> > --- a/proxy/logging/Log.h
> > +++ b/proxy/logging/Log.h
> > @@ -420,17 +420,14 @@ public:
> > static void add_to_inactive(LogObject * obj);
> >
> > // logging thread stuff
> > - static ink_mutex *preproc_mutex;
> > - static ink_cond *preproc_cond;
> > + static EventNotify *preproc_notify;
> > static void *preproc_thread_main(void *args);
> > - static ink_mutex *flush_mutex;
> > - static ink_cond *flush_cond;
> > + static EventNotify *flush_notify;
> > static InkAtomicList *flush_data_list;
> > static void *flush_thread_main(void *args);
> >
> > // collation thread stuff
> > - static ink_mutex collate_mutex;
> > - static ink_cond collate_cond;
> > + static EventNotify collate_notify;
> > static ink_thread collate_thread;
> > static int collation_preproc_threads;
> > static int collation_accept_file_descriptor;
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogCollationHostSM.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/LogCollationHostSM.cc
> b/proxy/logging/LogCollationHostSM.cc
> > index 9add290..36452f2 100644
> > --- a/proxy/logging/LogCollationHostSM.cc
> > +++ b/proxy/logging/LogCollationHostSM.cc
> > @@ -321,7 +321,7 @@ LogCollationHostSM::host_recv(int event, void * /*
> data ATS_UNUSED */)
> > //
> > log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
> > int idx = log_object->add_to_flush_queue(log_buffer);
> > - ink_cond_signal(&Log::preproc_cond[idx]);
> > + Log::preproc_notify[idx].signal();
> > }
> >
> > #if defined(LOG_BUFFER_TRACKING)
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogConfig.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
> > index 6cf55ba..8e3b585 100644
> > --- a/proxy/logging/LogConfig.cc
> > +++ b/proxy/logging/LogConfig.cc
> > @@ -661,7 +661,7 @@ LogConfig::setup_collation(LogConfig * prev_config)
> > // since we are the collation host, we need to signal the
> > // collate_cond variable so that our collation thread wakes up.
> > //
> > - ink_cond_signal(&Log::collate_cond);
> > + Log::collate_notify.signal();
> > #endif
> > Debug("log", "I am a collation host listening on port %d.",
> collation_port);
> > } else {
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogFile.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
> > index 5bf6ba4..6b93a26 100644
> > --- a/proxy/logging/LogFile.cc
> > +++ b/proxy/logging/LogFile.cc
> > @@ -523,7 +523,7 @@ LogFile::preproc_and_try_delete(LogBuffer * lb)
> >
> > ink_atomiclist_push(Log::flush_data_list, flush_data);
> >
> > - ink_cond_signal(Log::flush_cond);
> > + Log::flush_notify->signal();
> >
> > //
> > // LogBuffer will be deleted in flush thread
> > @@ -693,7 +693,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader *
> buffer_header, char *alt_forma
> > LogFlushData *flush_data = new LogFlushData(this, ascii_buffer,
> fmt_buf_bytes);
> > ink_atomiclist_push(Log::flush_data_list, flush_data);
> >
> > - ink_cond_signal(Log::flush_cond);
> > + Log::flush_notify->signal();
> >
> > total_bytes += fmt_buf_bytes;
> > }
> >
> >
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogObject.cc
> > ----------------------------------------------------------------------
> > diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
> > index ea059e2..29f1a39 100644
> > --- a/proxy/logging/LogObject.cc
> > +++ b/proxy/logging/LogObject.cc
> > @@ -461,7 +461,7 @@ LogObject::_checkout_write(size_t * write_offset,
> size_t bytes_needed) {
> > int idx = m_buffer_manager_idx++ % m_flush_threads;
> > Debug("log-logbuffer", "adding buffer %d to flush list after
> checkout", buffer->get_id());
> > m_buffer_manager[idx].add_to_flush_queue(buffer);
> > - ink_cond_signal(&Log::preproc_cond[idx]);
> > + Log::preproc_notify[idx].signal();
> >
> > }
> > decremented = true;
> >
>
>
--
Yunkai Zhang
Work at Taobao
Re: [1/3] git commit: TS-2137: Use eventfd instread of pthread
signal/wait in ATS
Posted by James Peach <jp...@apache.org>.
On Aug 21, 2013, at 2:43 AM, yunkai@apache.org wrote:
> Updated Branches:
> refs/heads/master b30d69825 -> 5f4b470ce
>
>
> TS-2137: Use eventfd instread of pthread signal/wait in ATS
>
> pthread_cond_signal/wait is used in several places in ATS, including but
> not limited:
> 1) Logging system.
> 2) ProtectedQueue in event system.
> 3) RecProcess in stats system.
>
> As we known, pthread_cond_signal() need to take lock, it'll cause
> more context switch than eventfd.
>
> In my testing:
> 1) Using _signal()/_wait() pair, eventfd is about 5 times faster than
> pthread cond.
>
> 2) Using _signal()/_timedwait() pair, eventfd is about 2 times faster
> than pthread cond.
This looks pretty good. I had a couple of comments ...
- EventNotify::timedwait() should take a relative timeout, since this is almost always what you want to use. Since timedwait() is not currently used, consider removing it.
- You should also remove EventNotify::m_name, since that's never used or even set.
- EventNotify.h needs header #include guards.
>
> == NOTE ==
> pthread_cond_signal/wait is also used by AIO module, but we can't
> simply replace it with eventfd, as AIO code use the mutex to protect
> other stuff. And I found we can't replace it in ProtectedQueue directly,
> there are some prepare work to do.
>
> Signed-off-by: Yunkai Zhang <qi...@taobao.com>
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f4f8d99f
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f4f8d99f
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f4f8d99f
>
> Branch: refs/heads/master
> Commit: f4f8d99f65b7ffbc37e7f72adc0694adb26041c4
> Parents: b30d698
> Author: Yunkai Zhang <qi...@taobao.com>
> Authored: Sat Aug 17 16:17:41 2013 +0800
> Committer: Yunkai Zhang <qi...@taobao.com>
> Committed: Wed Aug 21 17:39:53 2013 +0800
>
> ----------------------------------------------------------------------
> lib/records/RecProcess.cc | 17 ++-
> lib/ts/EventNotify.cc | 172 +++++++++++++++++++++++++++++++
> lib/ts/EventNotify.h | 54 ++++++++++
> lib/ts/Makefile.am | 2 +
> lib/ts/libts.h | 1 +
> proxy/logging/Log.cc | 47 +++------
> proxy/logging/Log.h | 9 +-
> proxy/logging/LogCollationHostSM.cc | 2 +-
> proxy/logging/LogConfig.cc | 2 +-
> proxy/logging/LogFile.cc | 4 +-
> proxy/logging/LogObject.cc | 2 +-
> 11 files changed, 261 insertions(+), 51 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/records/RecProcess.cc
> ----------------------------------------------------------------------
> diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc
> index b9c69ba..3f1f34e 100644
> --- a/lib/records/RecProcess.cc
> +++ b/lib/records/RecProcess.cc
> @@ -35,8 +35,7 @@
> static bool g_initialized = false;
> static bool g_message_initialized = false;
> static bool g_started = false;
> -static ink_cond g_force_req_cond;
> -static ink_mutex g_force_req_mutex;
> +static EventNotify g_force_req_notify;
> static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
> static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
> static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
> @@ -263,9 +262,9 @@ recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
>
> if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
> if (msg_type == RECG_PULL_ACK) {
> - ink_mutex_acquire(&g_force_req_mutex);
> - ink_cond_signal(&g_force_req_cond);
> - ink_mutex_release(&g_force_req_mutex);
> + g_force_req_notify.lock();
> + g_force_req_notify.signal();
> + g_force_req_notify.unlock();
> }
> }
> return err;
> @@ -419,13 +418,11 @@ RecProcessInitMessage(RecModeT mode_type)
> return REC_ERR_FAIL;
> }
>
> - ink_cond_init(&g_force_req_cond);
> - ink_mutex_init(&g_force_req_mutex, NULL);
> if (mode_type == RECM_CLIENT) {
> send_pull_message(RECG_PULL_REQ);
> - ink_mutex_acquire(&g_force_req_mutex);
> - ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
> - ink_mutex_release(&g_force_req_mutex);
> + g_force_req_notify.lock();
> + g_force_req_notify.wait();
> + g_force_req_notify.unlock();
> }
>
> g_message_initialized = true;
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.cc
> ----------------------------------------------------------------------
> diff --git a/lib/ts/EventNotify.cc b/lib/ts/EventNotify.cc
> new file mode 100644
> index 0000000..5d0cc53
> --- /dev/null
> +++ b/lib/ts/EventNotify.cc
> @@ -0,0 +1,172 @@
> +/** @file
> +
> + A brief file description
> +
> + @section license License
> +
> + Licensed to the Apache Software Foundation (ASF) under one
> + or more contributor license agreements. See the NOTICE file
> + distributed with this work for additional information
> + regarding copyright ownership. The ASF licenses this file
> + to you under the Apache License, Version 2.0 (the
> + "License"); you may not use this file except in compliance
> + with the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> + */
> +
> +/**************************************************************************
> + EventNotify.cc
> +
> + Generic event notify mechanism among threads.
> +**************************************************************************/
> +
> +#include "EventNotify.h"
> +#include "ink_hrtime.h"
> +
> +#ifdef TS_HAS_EVENTFD
> +#include <sys/eventfd.h>
> +#include <sys/epoll.h>
> +#endif
> +
> +EventNotify::EventNotify(const char *name): m_name(name)
> +{
> +#ifdef TS_HAS_EVENTFD
> + int ret;
> + struct epoll_event ev;
> +
> + // Don't use noblock here!
> + m_event_fd = eventfd(0, EFD_CLOEXEC);
> + if (m_event_fd < 0) {
> + // EFD_CLOEXEC invalid in <= Linux 2.6.27
> + m_event_fd = eventfd(0, 0);
> + }
> + ink_release_assert(m_event_fd != -1);
> +
> + ev.events = EPOLLIN;
> + ev.data.fd = m_event_fd;
> +
> + m_epoll_fd = epoll_create(1);
> + ink_release_assert(m_epoll_fd != -1);
> +
> + ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_event_fd, &ev);
> + ink_release_assert(ret != -1);
> +#else
> + ink_cond_init(&m_cond);
> + ink_mutex_init(&m_mutex, m_name);
> +#endif
> +}
> +
> +void
> +EventNotify::signal(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + ssize_t nr;
> + uint64_t value = 1;
> + nr = write(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +#else
> + ink_cond_signal(&m_cond);
> +#endif
> +}
> +
> +void
> +EventNotify::wait(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + ssize_t nr;
> + uint64_t value = 0;
> + nr = read(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +#else
> + ink_cond_wait(&m_cond, &m_mutex);
> +#endif
> +}
> +
> +int
> +EventNotify::timedwait(ink_timestruc *abstime)
> +{
> +#ifdef TS_HAS_EVENTFD
> + int timeout;
> + ssize_t nr, nr_fd = 0;
> + uint64_t value = 0;
> + struct timeval curtime;
> + struct epoll_event ev;
> +
> + // Convert absolute time to relative time
> + gettimeofday(&curtime, NULL);
> + timeout = (abstime->tv_sec - curtime.tv_sec) * 1000
> + + (abstime->tv_nsec / 1000 - curtime.tv_usec) / 1000;
> +
> + //
> + // When timeout < 0, epoll_wait() will wait indefinitely, but
> + // pthread_cond_timedwait() will return ETIMEDOUT immediately.
> + // We should keep compatible with pthread_cond_timedwait() here.
> + //
> + if (timeout < 0)
> + return ETIMEDOUT;
> +
> + do {
> + nr_fd = epoll_wait(m_epoll_fd, &ev, 1, timeout);
> + } while (nr_fd == -1 && errno == EINTR);
> +
> + if (nr_fd == 0)
> + return ETIMEDOUT;
> + else if (nr_fd == -1)
> + return errno;
> +
> + nr = read(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +
> + return 0;
> +#else
> + return ink_cond_timedwait(&m_cond, &m_mutex, abstime);
> +#endif
> +}
> +
> +void
> +EventNotify::lock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + // do nothing
> +#else
> + ink_mutex_acquire(&m_mutex);
> +#endif
> +}
> +
> +bool
> +EventNotify::trylock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + return true;
> +#else
> + return ink_mutex_try_acquire(&m_mutex);
> +#endif
> +}
> +
> +void
> +EventNotify::unlock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + // do nothing
> +#else
> + ink_mutex_release(&m_mutex);
> +#endif
> +}
> +
> +EventNotify::~EventNotify()
> +{
> +#ifdef TS_HAS_EVENTFD
> + close(m_event_fd);
> + close(m_epoll_fd);
> +#else
> + ink_cond_destroy(&m_cond);
> + ink_mutex_destroy(&m_mutex);
> +#endif
> +}
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/EventNotify.h b/lib/ts/EventNotify.h
> new file mode 100644
> index 0000000..16e4809
> --- /dev/null
> +++ b/lib/ts/EventNotify.h
> @@ -0,0 +1,54 @@
> +/** @file
> +
> + A brief file description
> +
> + @section license License
> +
> + Licensed to the Apache Software Foundation (ASF) under one
> + or more contributor license agreements. See the NOTICE file
> + distributed with this work for additional information
> + regarding copyright ownership. The ASF licenses this file
> + to you under the Apache License, Version 2.0 (the
> + "License"); you may not use this file except in compliance
> + with the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> + */
> +
> +/**************************************************************************
> + EventNotify.h
> +
> + Generic event notify mechanism among threads.
> +
> +**************************************************************************/
> +
> +#include "ink_thread.h"
> +
> +class EventNotify
> +{
> +public:
> + EventNotify(const char *name = NULL);
> + void signal(void);
> + void wait(void);
> + int timedwait(ink_timestruc *abstime);
> + void lock(void);
> + bool trylock(void);
> + void unlock(void);
> + ~EventNotify();
> +
> +private:
> + const char *m_name;
> +#ifdef TS_HAS_EVENTFD
> + int m_event_fd;
> + int m_epoll_fd;
> +#else
> + ink_cond m_cond;
> + ink_mutex m_mutex;
> +#endif
> +};
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/lib/ts/Makefile.am b/lib/ts/Makefile.am
> index c360a4b..3fca2f8 100644
> --- a/lib/ts/Makefile.am
> +++ b/lib/ts/Makefile.am
> @@ -117,6 +117,8 @@ libtsutil_la_SOURCES = \
> ink_syslog.h \
> ink_thread.cc \
> ink_thread.h \
> + EventNotify.h \
> + EventNotify.cc \
> ink_time.cc \
> ink_time.h \
> inktomi++.h \
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/libts.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/libts.h b/lib/ts/libts.h
> index 046afaa..27c9e92 100644
> --- a/lib/ts/libts.h
> +++ b/lib/ts/libts.h
> @@ -81,6 +81,7 @@
> #include "Bitops.h"
> #include "Compatability.h"
> #include "DynArray.h"
> +#include "EventNotify.h"
> #include "I_Version.h"
> #include "InkPool.h"
> #include "List.h"
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 502c424..790bc04 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,15 +77,12 @@ size_t Log::numInactiveObjects;
> size_t Log::maxInactiveObjects;
>
> // Flush thread stuff
> -ink_mutex *Log::preproc_mutex;
> -ink_cond *Log::preproc_cond;
> -ink_mutex *Log::flush_mutex;
> -ink_cond *Log::flush_cond;
> +EventNotify *Log::preproc_notify;
> +EventNotify *Log::flush_notify;
> InkAtomicList *Log::flush_data_list;
>
> // Collate thread stuff
> -ink_mutex Log::collate_mutex;
> -ink_cond Log::collate_cond;
> +EventNotify Log::collate_notify;
> ink_thread Log::collate_thread;
> int Log::collation_accept_file_descriptor;
> int Log::collation_preproc_threads;
> @@ -189,10 +186,10 @@ struct PeriodicWakeup : Continuation
> int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> {
> for (int i = 0; i < m_preproc_threads; i++) {
> - ink_cond_signal (&Log::preproc_cond[i]);
> + Log::preproc_notify[i].signal();
> }
> for (int i = 0; i < m_flush_threads; i++) {
> - ink_cond_signal (&Log::flush_cond[i]);
> + Log::flush_notify[i].signal();
> }
> return EVENT_CONT;
> }
> @@ -1070,8 +1067,7 @@ Log::create_threads()
> if (!(init_status & THREADS_CREATED)) {
>
> char desc[64];
> - preproc_mutex = new ink_mutex[collation_preproc_threads];
> - preproc_cond = new ink_cond[collation_preproc_threads];
> + preproc_notify = new EventNotify[collation_preproc_threads];
>
> size_t stacksize;
> REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
> @@ -1081,9 +1077,6 @@ Log::create_threads()
> // no need for the conditional var since it will be relying on
> // on the event system.
> for (int i = 0; i < collation_preproc_threads; i++) {
> - sprintf(desc, "Logging preproc thread mutex[%d]", i);
> - ink_mutex_init(&preproc_mutex[i], desc);
> - ink_cond_init(&preproc_cond[i]);
> Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> sprintf(desc, "[LOG_PREPROC %d]", i);
> eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
> @@ -1093,13 +1086,9 @@ Log::create_threads()
> // TODO: Enable multiple flush threads, such as
> // one flush thread per file.
> //
> - flush_mutex = new ink_mutex;
> - flush_cond = new ink_cond;
> + flush_notify = new EventNotify;
> flush_data_list = new InkAtomicList;
>
> - sprintf(desc, "Logging flush thread mutex");
> - ink_mutex_init(flush_mutex, desc);
> - ink_cond_init(flush_cond);
> sprintf(desc, "Logging flush buffer list");
> ink_atomiclist_init(flush_data_list, desc, 0);
> Continuation *flush_cont = NEW(new LoggingFlushContinuation(0));
> @@ -1118,8 +1107,6 @@ Log::create_threads()
> // much overhead associated with keeping an ink_thread blocked on a
> // condition variable.
> //
> - ink_mutex_init(&collate_mutex, "Collate thread mutex");
> - ink_cond_init(&collate_cond);
> Continuation *collate_continuation = NEW(new LoggingCollateContinuation);
> Event *collate_event = eventProcessor.spawn_thread(collate_continuation);
> collate_thread = collate_event->ethread->tid;
> @@ -1246,7 +1233,7 @@ Log::preproc_thread_main(void *args)
>
> Debug("log-preproc", "log preproc thread is alive ...");
>
> - ink_mutex_acquire(&preproc_mutex[idx]);
> + Log::preproc_notify[idx].lock();
>
> while (true) {
> buffers_preproced = config->log_object_manager.preproc_buffers(idx);
> @@ -1264,11 +1251,11 @@ Log::preproc_thread_main(void *args)
> // check the queue and find there is nothing to do, then wait
> // again.
> //
> - ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]);
> + Log::preproc_notify[idx].wait();
> }
>
> /* NOTREACHED */
> - ink_mutex_release(&preproc_mutex[idx]);
> + Log::preproc_notify[idx].unlock();
> return NULL;
> }
>
> @@ -1283,7 +1270,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
> int len, bytes_written, total_bytes;
> SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
>
> - ink_mutex_acquire(flush_mutex);
> + Log::flush_notify->lock();
>
> while (true) {
> fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
> @@ -1369,11 +1356,11 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
> // check the queue and find there is nothing to do, then wait
> // again.
> //
> - ink_cond_wait(flush_cond, flush_mutex);
> + Log::flush_notify->wait();
> }
>
> /* NOTREACHED */
> - ink_mutex_release(flush_mutex);
> + Log::flush_notify->unlock();
> return NULL;
> }
>
> @@ -1398,7 +1385,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
>
> Debug("log-thread", "Log collation thread is alive ...");
>
> - ink_mutex_acquire(&collate_mutex);
> + Log::collate_notify.lock();
>
> while (true) {
> ink_assert(Log::config != NULL);
> @@ -1408,7 +1395,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> // wake-ups.
> //
> while (!Log::config->am_collation_host()) {
> - ink_cond_wait(&collate_cond, &collate_mutex);
> + Log::collate_notify.wait();
> }
>
> // Ok, at this point we know we're a log collation host, so get to
> @@ -1427,7 +1414,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> //
> // go to sleep ...
> //
> - ink_cond_wait(&collate_cond, &collate_mutex);
> + Log::collate_notify.wait();
> continue;
> }
>
> @@ -1489,7 +1476,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> }
>
> /* NOTREACHED */
> - ink_mutex_release(&collate_mutex);
> + Log::collate_notify.unlock();
> return NULL;
> }
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.h
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
> index 2f395ca..2233220 100644
> --- a/proxy/logging/Log.h
> +++ b/proxy/logging/Log.h
> @@ -420,17 +420,14 @@ public:
> static void add_to_inactive(LogObject * obj);
>
> // logging thread stuff
> - static ink_mutex *preproc_mutex;
> - static ink_cond *preproc_cond;
> + static EventNotify *preproc_notify;
> static void *preproc_thread_main(void *args);
> - static ink_mutex *flush_mutex;
> - static ink_cond *flush_cond;
> + static EventNotify *flush_notify;
> static InkAtomicList *flush_data_list;
> static void *flush_thread_main(void *args);
>
> // collation thread stuff
> - static ink_mutex collate_mutex;
> - static ink_cond collate_cond;
> + static EventNotify collate_notify;
> static ink_thread collate_thread;
> static int collation_preproc_threads;
> static int collation_accept_file_descriptor;
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogCollationHostSM.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogCollationHostSM.cc b/proxy/logging/LogCollationHostSM.cc
> index 9add290..36452f2 100644
> --- a/proxy/logging/LogCollationHostSM.cc
> +++ b/proxy/logging/LogCollationHostSM.cc
> @@ -321,7 +321,7 @@ LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */)
> //
> log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
> int idx = log_object->add_to_flush_queue(log_buffer);
> - ink_cond_signal(&Log::preproc_cond[idx]);
> + Log::preproc_notify[idx].signal();
> }
>
> #if defined(LOG_BUFFER_TRACKING)
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogConfig.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
> index 6cf55ba..8e3b585 100644
> --- a/proxy/logging/LogConfig.cc
> +++ b/proxy/logging/LogConfig.cc
> @@ -661,7 +661,7 @@ LogConfig::setup_collation(LogConfig * prev_config)
> // since we are the collation host, we need to signal the
> // collate_cond variable so that our collation thread wakes up.
> //
> - ink_cond_signal(&Log::collate_cond);
> + Log::collate_notify.signal();
> #endif
> Debug("log", "I am a collation host listening on port %d.", collation_port);
> } else {
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogFile.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
> index 5bf6ba4..6b93a26 100644
> --- a/proxy/logging/LogFile.cc
> +++ b/proxy/logging/LogFile.cc
> @@ -523,7 +523,7 @@ LogFile::preproc_and_try_delete(LogBuffer * lb)
>
> ink_atomiclist_push(Log::flush_data_list, flush_data);
>
> - ink_cond_signal(Log::flush_cond);
> + Log::flush_notify->signal();
>
> //
> // LogBuffer will be deleted in flush thread
> @@ -693,7 +693,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
> LogFlushData *flush_data = new LogFlushData(this, ascii_buffer, fmt_buf_bytes);
> ink_atomiclist_push(Log::flush_data_list, flush_data);
>
> - ink_cond_signal(Log::flush_cond);
> + Log::flush_notify->signal();
>
> total_bytes += fmt_buf_bytes;
> }
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogObject.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
> index ea059e2..29f1a39 100644
> --- a/proxy/logging/LogObject.cc
> +++ b/proxy/logging/LogObject.cc
> @@ -461,7 +461,7 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
> int idx = m_buffer_manager_idx++ % m_flush_threads;
> Debug("log-logbuffer", "adding buffer %d to flush list after checkout", buffer->get_id());
> m_buffer_manager[idx].add_to_flush_queue(buffer);
> - ink_cond_signal(&Log::preproc_cond[idx]);
> + Log::preproc_notify[idx].signal();
>
> }
> decremented = true;
>
[3/3] git commit: TS-2141: Make traffic_cop reconnect to manager
correctly
Posted by yu...@apache.org.
TS-2141: Make traffic_cop reconnect to manager correctly
1) By removing "TS_MGMT_OPT_NO_SOCK_TESTS" option when calls
TSInit() in traffic_cop, cop will reconnect to manager correctly
again.
2) Fix some small bugs about usage of pthread.
Signed-off-by: Yunkai Zhang <qi...@taobao.com>
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/5f4b470c
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/5f4b470c
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/5f4b470c
Branch: refs/heads/master
Commit: 5f4b470cedcd4bd6d0b1f54c55b15662a0fe8ce8
Parents: ce9188b
Author: Yunkai Zhang <qi...@taobao.com>
Authored: Tue Aug 20 22:11:10 2013 +0800
Committer: Yunkai Zhang <qi...@taobao.com>
Committed: Wed Aug 21 17:42:48 2013 +0800
----------------------------------------------------------------------
cop/TrafficCop.cc | 2 +-
mgmt/api/remote/CoreAPIRemote.cc | 9 +++++++++
mgmt/api/remote/EventRegistration.cc | 1 +
mgmt/api/remote/NetworkUtilsRemote.cc | 2 +-
4 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/5f4b470c/cop/TrafficCop.cc
----------------------------------------------------------------------
diff --git a/cop/TrafficCop.cc b/cop/TrafficCop.cc
index 0b03c31..a681704 100644
--- a/cop/TrafficCop.cc
+++ b/cop/TrafficCop.cc
@@ -1593,7 +1593,7 @@ check(void *arg)
// We do this after the first round of checks, since the first "check" will spawn traffic_manager
if (!mgmt_init) {
- TSInit(Layout::get()->runtimedir, static_cast<TSInitOptionT>(TS_MGMT_OPT_NO_EVENTS | TS_MGMT_OPT_NO_SOCK_TESTS));
+ TSInit(Layout::get()->runtimedir, static_cast<TSInitOptionT>(TS_MGMT_OPT_NO_EVENTS));
mgmt_init = true;
}
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/5f4b470c/mgmt/api/remote/CoreAPIRemote.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/remote/CoreAPIRemote.cc b/mgmt/api/remote/CoreAPIRemote.cc
index 878ed11..cb2ae5c 100644
--- a/mgmt/api/remote/CoreAPIRemote.cc
+++ b/mgmt/api/remote/CoreAPIRemote.cc
@@ -311,6 +311,15 @@ Terminate()
if (ts_event_thread)
ink_thread_cancel(ts_event_thread);
+ // Before clear, we should confirm these
+ // two threads have finished. Or the clear
+ // operation may lead them crash.
+ if (ts_test_thread)
+ ink_thread_join(ts_test_thread);
+ if (ts_event_thread)
+ ink_thread_join(ts_event_thread);
+
+ // Clear operation
ts_test_thread = static_cast<ink_thread>(NULL);
ts_event_thread = static_cast<ink_thread>(NULL);
set_socket_paths(NULL); // clear the socket_path
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/5f4b470c/mgmt/api/remote/EventRegistration.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/remote/EventRegistration.cc b/mgmt/api/remote/EventRegistration.cc
index 383a3da..2a3d7fb 100644
--- a/mgmt/api/remote/EventRegistration.cc
+++ b/mgmt/api/remote/EventRegistration.cc
@@ -161,5 +161,6 @@ event_callback_thread(void *arg)
delete_queue(func_q);
// all done!
+ ink_thread_exit(NULL);
return NULL;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/5f4b470c/mgmt/api/remote/NetworkUtilsRemote.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/remote/NetworkUtilsRemote.cc b/mgmt/api/remote/NetworkUtilsRemote.cc
index bcf794a..92a2e55 100644
--- a/mgmt/api/remote/NetworkUtilsRemote.cc
+++ b/mgmt/api/remote/NetworkUtilsRemote.cc
@@ -459,7 +459,7 @@ socket_test_thread(void *)
{
// loop until client process dies
while (1) {
- if (socket_test(main_socket_fd) <= 0) {
+ if (main_socket_fd == -1 || socket_test(main_socket_fd) <= 0) {
// ASSUMES that in between the time the socket_test is made
// and this reconnect call is made, the main_socket_fd remains
// the same (eg. no one else called reconnect to TM successfully!!
Re: [1/3] git commit: TS-2137: Use eventfd instread of pthread
signal/wait in ATS
Posted by James Peach <jp...@apache.org>.
On Aug 21, 2013, at 2:43 AM, yunkai@apache.org wrote:
> Updated Branches:
> refs/heads/master b30d69825 -> 5f4b470ce
>
>
> TS-2137: Use eventfd instread of pthread signal/wait in ATS
>
> pthread_cond_signal/wait is used in several places in ATS, including but
> not limited:
> 1) Logging system.
> 2) ProtectedQueue in event system.
> 3) RecProcess in stats system.
>
> As we known, pthread_cond_signal() need to take lock, it'll cause
> more context switch than eventfd.
>
> In my testing:
> 1) Using _signal()/_wait() pair, eventfd is about 5 times faster than
> pthread cond.
>
> 2) Using _signal()/_timedwait() pair, eventfd is about 2 times faster
> than pthread cond.
This looks pretty good. I had a couple of comments ...
- EventNotify::timedwait() should take a relative timeout, since this is almost always what you want to use. Since timedwait() is not currently used, consider removing it.
- You should also remove EventNotify::m_name, since that's never used or even set.
- EventNotify.h needs header #include guards.
>
> == NOTE ==
> pthread_cond_signal/wait is also used by AIO module, but we can't
> simply replace it with eventfd, as AIO code use the mutex to protect
> other stuff. And I found we can't replace it in ProtectedQueue directly,
> there are some prepare work to do.
>
> Signed-off-by: Yunkai Zhang <qi...@taobao.com>
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f4f8d99f
> Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f4f8d99f
> Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f4f8d99f
>
> Branch: refs/heads/master
> Commit: f4f8d99f65b7ffbc37e7f72adc0694adb26041c4
> Parents: b30d698
> Author: Yunkai Zhang <qi...@taobao.com>
> Authored: Sat Aug 17 16:17:41 2013 +0800
> Committer: Yunkai Zhang <qi...@taobao.com>
> Committed: Wed Aug 21 17:39:53 2013 +0800
>
> ----------------------------------------------------------------------
> lib/records/RecProcess.cc | 17 ++-
> lib/ts/EventNotify.cc | 172 +++++++++++++++++++++++++++++++
> lib/ts/EventNotify.h | 54 ++++++++++
> lib/ts/Makefile.am | 2 +
> lib/ts/libts.h | 1 +
> proxy/logging/Log.cc | 47 +++------
> proxy/logging/Log.h | 9 +-
> proxy/logging/LogCollationHostSM.cc | 2 +-
> proxy/logging/LogConfig.cc | 2 +-
> proxy/logging/LogFile.cc | 4 +-
> proxy/logging/LogObject.cc | 2 +-
> 11 files changed, 261 insertions(+), 51 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/records/RecProcess.cc
> ----------------------------------------------------------------------
> diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc
> index b9c69ba..3f1f34e 100644
> --- a/lib/records/RecProcess.cc
> +++ b/lib/records/RecProcess.cc
> @@ -35,8 +35,7 @@
> static bool g_initialized = false;
> static bool g_message_initialized = false;
> static bool g_started = false;
> -static ink_cond g_force_req_cond;
> -static ink_mutex g_force_req_mutex;
> +static EventNotify g_force_req_notify;
> static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
> static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
> static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
> @@ -263,9 +262,9 @@ recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
>
> if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
> if (msg_type == RECG_PULL_ACK) {
> - ink_mutex_acquire(&g_force_req_mutex);
> - ink_cond_signal(&g_force_req_cond);
> - ink_mutex_release(&g_force_req_mutex);
> + g_force_req_notify.lock();
> + g_force_req_notify.signal();
> + g_force_req_notify.unlock();
> }
> }
> return err;
> @@ -419,13 +418,11 @@ RecProcessInitMessage(RecModeT mode_type)
> return REC_ERR_FAIL;
> }
>
> - ink_cond_init(&g_force_req_cond);
> - ink_mutex_init(&g_force_req_mutex, NULL);
> if (mode_type == RECM_CLIENT) {
> send_pull_message(RECG_PULL_REQ);
> - ink_mutex_acquire(&g_force_req_mutex);
> - ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
> - ink_mutex_release(&g_force_req_mutex);
> + g_force_req_notify.lock();
> + g_force_req_notify.wait();
> + g_force_req_notify.unlock();
> }
>
> g_message_initialized = true;
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.cc
> ----------------------------------------------------------------------
> diff --git a/lib/ts/EventNotify.cc b/lib/ts/EventNotify.cc
> new file mode 100644
> index 0000000..5d0cc53
> --- /dev/null
> +++ b/lib/ts/EventNotify.cc
> @@ -0,0 +1,172 @@
> +/** @file
> +
> + A brief file description
> +
> + @section license License
> +
> + Licensed to the Apache Software Foundation (ASF) under one
> + or more contributor license agreements. See the NOTICE file
> + distributed with this work for additional information
> + regarding copyright ownership. The ASF licenses this file
> + to you under the Apache License, Version 2.0 (the
> + "License"); you may not use this file except in compliance
> + with the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> + */
> +
> +/**************************************************************************
> + EventNotify.cc
> +
> + Generic event notify mechanism among threads.
> +**************************************************************************/
> +
> +#include "EventNotify.h"
> +#include "ink_hrtime.h"
> +
> +#ifdef TS_HAS_EVENTFD
> +#include <sys/eventfd.h>
> +#include <sys/epoll.h>
> +#endif
> +
> +EventNotify::EventNotify(const char *name): m_name(name)
> +{
> +#ifdef TS_HAS_EVENTFD
> + int ret;
> + struct epoll_event ev;
> +
> + // Don't use noblock here!
> + m_event_fd = eventfd(0, EFD_CLOEXEC);
> + if (m_event_fd < 0) {
> + // EFD_CLOEXEC invalid in <= Linux 2.6.27
> + m_event_fd = eventfd(0, 0);
> + }
> + ink_release_assert(m_event_fd != -1);
> +
> + ev.events = EPOLLIN;
> + ev.data.fd = m_event_fd;
> +
> + m_epoll_fd = epoll_create(1);
> + ink_release_assert(m_epoll_fd != -1);
> +
> + ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_event_fd, &ev);
> + ink_release_assert(ret != -1);
> +#else
> + ink_cond_init(&m_cond);
> + ink_mutex_init(&m_mutex, m_name);
> +#endif
> +}
> +
> +void
> +EventNotify::signal(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + ssize_t nr;
> + uint64_t value = 1;
> + nr = write(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +#else
> + ink_cond_signal(&m_cond);
> +#endif
> +}
> +
> +void
> +EventNotify::wait(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + ssize_t nr;
> + uint64_t value = 0;
> + nr = read(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +#else
> + ink_cond_wait(&m_cond, &m_mutex);
> +#endif
> +}
> +
> +int
> +EventNotify::timedwait(ink_timestruc *abstime)
> +{
> +#ifdef TS_HAS_EVENTFD
> + int timeout;
> + ssize_t nr, nr_fd = 0;
> + uint64_t value = 0;
> + struct timeval curtime;
> + struct epoll_event ev;
> +
> + // Convert absolute time to relative time
> + gettimeofday(&curtime, NULL);
> + timeout = (abstime->tv_sec - curtime.tv_sec) * 1000
> + + (abstime->tv_nsec / 1000 - curtime.tv_usec) / 1000;
> +
> + //
> + // When timeout < 0, epoll_wait() will wait indefinitely, but
> + // pthread_cond_timedwait() will return ETIMEDOUT immediately.
> + // We should keep compatible with pthread_cond_timedwait() here.
> + //
> + if (timeout < 0)
> + return ETIMEDOUT;
> +
> + do {
> + nr_fd = epoll_wait(m_epoll_fd, &ev, 1, timeout);
> + } while (nr_fd == -1 && errno == EINTR);
> +
> + if (nr_fd == 0)
> + return ETIMEDOUT;
> + else if (nr_fd == -1)
> + return errno;
> +
> + nr = read(m_event_fd, &value, sizeof(uint64_t));
> + ink_release_assert(nr == sizeof(uint64_t));
> +
> + return 0;
> +#else
> + return ink_cond_timedwait(&m_cond, &m_mutex, abstime);
> +#endif
> +}
> +
> +void
> +EventNotify::lock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + // do nothing
> +#else
> + ink_mutex_acquire(&m_mutex);
> +#endif
> +}
> +
> +bool
> +EventNotify::trylock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + return true;
> +#else
> + return ink_mutex_try_acquire(&m_mutex);
> +#endif
> +}
> +
> +void
> +EventNotify::unlock(void)
> +{
> +#ifdef TS_HAS_EVENTFD
> + // do nothing
> +#else
> + ink_mutex_release(&m_mutex);
> +#endif
> +}
> +
> +EventNotify::~EventNotify()
> +{
> +#ifdef TS_HAS_EVENTFD
> + close(m_event_fd);
> + close(m_epoll_fd);
> +#else
> + ink_cond_destroy(&m_cond);
> + ink_mutex_destroy(&m_mutex);
> +#endif
> +}
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/EventNotify.h b/lib/ts/EventNotify.h
> new file mode 100644
> index 0000000..16e4809
> --- /dev/null
> +++ b/lib/ts/EventNotify.h
> @@ -0,0 +1,54 @@
> +/** @file
> +
> + A brief file description
> +
> + @section license License
> +
> + Licensed to the Apache Software Foundation (ASF) under one
> + or more contributor license agreements. See the NOTICE file
> + distributed with this work for additional information
> + regarding copyright ownership. The ASF licenses this file
> + to you under the Apache License, Version 2.0 (the
> + "License"); you may not use this file except in compliance
> + with the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> + */
> +
> +/**************************************************************************
> + EventNotify.h
> +
> + Generic event notify mechanism among threads.
> +
> +**************************************************************************/
> +
> +#include "ink_thread.h"
> +
> +class EventNotify
> +{
> +public:
> + EventNotify(const char *name = NULL);
> + void signal(void);
> + void wait(void);
> + int timedwait(ink_timestruc *abstime);
> + void lock(void);
> + bool trylock(void);
> + void unlock(void);
> + ~EventNotify();
> +
> +private:
> + const char *m_name;
> +#ifdef TS_HAS_EVENTFD
> + int m_event_fd;
> + int m_epoll_fd;
> +#else
> + ink_cond m_cond;
> + ink_mutex m_mutex;
> +#endif
> +};
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/Makefile.am
> ----------------------------------------------------------------------
> diff --git a/lib/ts/Makefile.am b/lib/ts/Makefile.am
> index c360a4b..3fca2f8 100644
> --- a/lib/ts/Makefile.am
> +++ b/lib/ts/Makefile.am
> @@ -117,6 +117,8 @@ libtsutil_la_SOURCES = \
> ink_syslog.h \
> ink_thread.cc \
> ink_thread.h \
> + EventNotify.h \
> + EventNotify.cc \
> ink_time.cc \
> ink_time.h \
> inktomi++.h \
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/libts.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/libts.h b/lib/ts/libts.h
> index 046afaa..27c9e92 100644
> --- a/lib/ts/libts.h
> +++ b/lib/ts/libts.h
> @@ -81,6 +81,7 @@
> #include "Bitops.h"
> #include "Compatability.h"
> #include "DynArray.h"
> +#include "EventNotify.h"
> #include "I_Version.h"
> #include "InkPool.h"
> #include "List.h"
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
> index 502c424..790bc04 100644
> --- a/proxy/logging/Log.cc
> +++ b/proxy/logging/Log.cc
> @@ -77,15 +77,12 @@ size_t Log::numInactiveObjects;
> size_t Log::maxInactiveObjects;
>
> // Flush thread stuff
> -ink_mutex *Log::preproc_mutex;
> -ink_cond *Log::preproc_cond;
> -ink_mutex *Log::flush_mutex;
> -ink_cond *Log::flush_cond;
> +EventNotify *Log::preproc_notify;
> +EventNotify *Log::flush_notify;
> InkAtomicList *Log::flush_data_list;
>
> // Collate thread stuff
> -ink_mutex Log::collate_mutex;
> -ink_cond Log::collate_cond;
> +EventNotify Log::collate_notify;
> ink_thread Log::collate_thread;
> int Log::collation_accept_file_descriptor;
> int Log::collation_preproc_threads;
> @@ -189,10 +186,10 @@ struct PeriodicWakeup : Continuation
> int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
> {
> for (int i = 0; i < m_preproc_threads; i++) {
> - ink_cond_signal (&Log::preproc_cond[i]);
> + Log::preproc_notify[i].signal();
> }
> for (int i = 0; i < m_flush_threads; i++) {
> - ink_cond_signal (&Log::flush_cond[i]);
> + Log::flush_notify[i].signal();
> }
> return EVENT_CONT;
> }
> @@ -1070,8 +1067,7 @@ Log::create_threads()
> if (!(init_status & THREADS_CREATED)) {
>
> char desc[64];
> - preproc_mutex = new ink_mutex[collation_preproc_threads];
> - preproc_cond = new ink_cond[collation_preproc_threads];
> + preproc_notify = new EventNotify[collation_preproc_threads];
>
> size_t stacksize;
> REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
> @@ -1081,9 +1077,6 @@ Log::create_threads()
> // no need for the conditional var since it will be relying on
> // on the event system.
> for (int i = 0; i < collation_preproc_threads; i++) {
> - sprintf(desc, "Logging preproc thread mutex[%d]", i);
> - ink_mutex_init(&preproc_mutex[i], desc);
> - ink_cond_init(&preproc_cond[i]);
> Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i));
> sprintf(desc, "[LOG_PREPROC %d]", i);
> eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
> @@ -1093,13 +1086,9 @@ Log::create_threads()
> // TODO: Enable multiple flush threads, such as
> // one flush thread per file.
> //
> - flush_mutex = new ink_mutex;
> - flush_cond = new ink_cond;
> + flush_notify = new EventNotify;
> flush_data_list = new InkAtomicList;
>
> - sprintf(desc, "Logging flush thread mutex");
> - ink_mutex_init(flush_mutex, desc);
> - ink_cond_init(flush_cond);
> sprintf(desc, "Logging flush buffer list");
> ink_atomiclist_init(flush_data_list, desc, 0);
> Continuation *flush_cont = NEW(new LoggingFlushContinuation(0));
> @@ -1118,8 +1107,6 @@ Log::create_threads()
> // much overhead associated with keeping an ink_thread blocked on a
> // condition variable.
> //
> - ink_mutex_init(&collate_mutex, "Collate thread mutex");
> - ink_cond_init(&collate_cond);
> Continuation *collate_continuation = NEW(new LoggingCollateContinuation);
> Event *collate_event = eventProcessor.spawn_thread(collate_continuation);
> collate_thread = collate_event->ethread->tid;
> @@ -1246,7 +1233,7 @@ Log::preproc_thread_main(void *args)
>
> Debug("log-preproc", "log preproc thread is alive ...");
>
> - ink_mutex_acquire(&preproc_mutex[idx]);
> + Log::preproc_notify[idx].lock();
>
> while (true) {
> buffers_preproced = config->log_object_manager.preproc_buffers(idx);
> @@ -1264,11 +1251,11 @@ Log::preproc_thread_main(void *args)
> // check the queue and find there is nothing to do, then wait
> // again.
> //
> - ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]);
> + Log::preproc_notify[idx].wait();
> }
>
> /* NOTREACHED */
> - ink_mutex_release(&preproc_mutex[idx]);
> + Log::preproc_notify[idx].unlock();
> return NULL;
> }
>
> @@ -1283,7 +1270,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
> int len, bytes_written, total_bytes;
> SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
>
> - ink_mutex_acquire(flush_mutex);
> + Log::flush_notify->lock();
>
> while (true) {
> fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
> @@ -1369,11 +1356,11 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
> // check the queue and find there is nothing to do, then wait
> // again.
> //
> - ink_cond_wait(flush_cond, flush_mutex);
> + Log::flush_notify->wait();
> }
>
> /* NOTREACHED */
> - ink_mutex_release(flush_mutex);
> + Log::flush_notify->unlock();
> return NULL;
> }
>
> @@ -1398,7 +1385,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
>
> Debug("log-thread", "Log collation thread is alive ...");
>
> - ink_mutex_acquire(&collate_mutex);
> + Log::collate_notify.lock();
>
> while (true) {
> ink_assert(Log::config != NULL);
> @@ -1408,7 +1395,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> // wake-ups.
> //
> while (!Log::config->am_collation_host()) {
> - ink_cond_wait(&collate_cond, &collate_mutex);
> + Log::collate_notify.wait();
> }
>
> // Ok, at this point we know we're a log collation host, so get to
> @@ -1427,7 +1414,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> //
> // go to sleep ...
> //
> - ink_cond_wait(&collate_cond, &collate_mutex);
> + Log::collate_notify.wait();
> continue;
> }
>
> @@ -1489,7 +1476,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */)
> }
>
> /* NOTREACHED */
> - ink_mutex_release(&collate_mutex);
> + Log::collate_notify.unlock();
> return NULL;
> }
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.h
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
> index 2f395ca..2233220 100644
> --- a/proxy/logging/Log.h
> +++ b/proxy/logging/Log.h
> @@ -420,17 +420,14 @@ public:
> static void add_to_inactive(LogObject * obj);
>
> // logging thread stuff
> - static ink_mutex *preproc_mutex;
> - static ink_cond *preproc_cond;
> + static EventNotify *preproc_notify;
> static void *preproc_thread_main(void *args);
> - static ink_mutex *flush_mutex;
> - static ink_cond *flush_cond;
> + static EventNotify *flush_notify;
> static InkAtomicList *flush_data_list;
> static void *flush_thread_main(void *args);
>
> // collation thread stuff
> - static ink_mutex collate_mutex;
> - static ink_cond collate_cond;
> + static EventNotify collate_notify;
> static ink_thread collate_thread;
> static int collation_preproc_threads;
> static int collation_accept_file_descriptor;
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogCollationHostSM.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogCollationHostSM.cc b/proxy/logging/LogCollationHostSM.cc
> index 9add290..36452f2 100644
> --- a/proxy/logging/LogCollationHostSM.cc
> +++ b/proxy/logging/LogCollationHostSM.cc
> @@ -321,7 +321,7 @@ LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */)
> //
> log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
> int idx = log_object->add_to_flush_queue(log_buffer);
> - ink_cond_signal(&Log::preproc_cond[idx]);
> + Log::preproc_notify[idx].signal();
> }
>
> #if defined(LOG_BUFFER_TRACKING)
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogConfig.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
> index 6cf55ba..8e3b585 100644
> --- a/proxy/logging/LogConfig.cc
> +++ b/proxy/logging/LogConfig.cc
> @@ -661,7 +661,7 @@ LogConfig::setup_collation(LogConfig * prev_config)
> // since we are the collation host, we need to signal the
> // collate_cond variable so that our collation thread wakes up.
> //
> - ink_cond_signal(&Log::collate_cond);
> + Log::collate_notify.signal();
> #endif
> Debug("log", "I am a collation host listening on port %d.", collation_port);
> } else {
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogFile.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
> index 5bf6ba4..6b93a26 100644
> --- a/proxy/logging/LogFile.cc
> +++ b/proxy/logging/LogFile.cc
> @@ -523,7 +523,7 @@ LogFile::preproc_and_try_delete(LogBuffer * lb)
>
> ink_atomiclist_push(Log::flush_data_list, flush_data);
>
> - ink_cond_signal(Log::flush_cond);
> + Log::flush_notify->signal();
>
> //
> // LogBuffer will be deleted in flush thread
> @@ -693,7 +693,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma
> LogFlushData *flush_data = new LogFlushData(this, ascii_buffer, fmt_buf_bytes);
> ink_atomiclist_push(Log::flush_data_list, flush_data);
>
> - ink_cond_signal(Log::flush_cond);
> + Log::flush_notify->signal();
>
> total_bytes += fmt_buf_bytes;
> }
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogObject.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
> index ea059e2..29f1a39 100644
> --- a/proxy/logging/LogObject.cc
> +++ b/proxy/logging/LogObject.cc
> @@ -461,7 +461,7 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) {
> int idx = m_buffer_manager_idx++ % m_flush_threads;
> Debug("log-logbuffer", "adding buffer %d to flush list after checkout", buffer->get_id());
> m_buffer_manager[idx].add_to_flush_queue(buffer);
> - ink_cond_signal(&Log::preproc_cond[idx]);
> + Log::preproc_notify[idx].signal();
>
> }
> decremented = true;
>
[2/3] git commit: TS-2141: Inconsistent euid cause bad
mgmtapi/eventapi sockets
Posted by yu...@apache.org.
TS-2141: Inconsistent euid cause bad mgmtapi/eventapi sockets
In the main function of traffic_manager, webIntr_main thread is created
after euid has been changed to "proxy.config.admin.user_id".
And then, webIntr_main thread will create mgmtapisocket/eventapisocket
socket fd, and listen on them.
But unfortunately, after created webIntr_main thread, the main thread
will call listenForProxy()->bindProxyPort() immediately, which might
change/restore euid concurrently.
For example:
1) bindProxyPort() change the euid to 0(root uid).
2) webIntr_main creates mgmtapisocket with 0 euid.
3) bindProxyPort() restore the euid to 501(admin.user_id)
4) webIntr_main calls chmod(0777) on mgmtapisocket, it'll get
"Operation not permitted" error and other unexpected errors.
As the api sockets can't be created correctly, traffic_cop will failed to
establish heartbeat with traffic_manager, and than traffic_cop will kill
traffic_manager automatically.
But more worse, after killed and restarted manager, traffic_cop forgets to
reconnect to manager, so it will use bad main_socket_fd to detect heartbeat
of manager. Of course, it'll faill to detect and than kill/restart manager
again and again and again.
I'll give another patch to fix reconnect problem of traffic_cop which seems
casued by an old commit:ba95f9c6e8c49f69f018885f255db02f31d7335f.
Signed-off-by: Yunkai Zhang <qi...@taobao.com>
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/ce9188b5
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/ce9188b5
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/ce9188b5
Branch: refs/heads/master
Commit: ce9188b5a68487669470c11d734a58807312201b
Parents: f4f8d99
Author: Yunkai Zhang <qi...@taobao.com>
Authored: Tue Aug 20 16:19:22 2013 +0800
Committer: Yunkai Zhang <qi...@taobao.com>
Committed: Wed Aug 21 17:42:44 2013 +0800
----------------------------------------------------------------------
mgmt/Main.cc | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/ce9188b5/mgmt/Main.cc
----------------------------------------------------------------------
diff --git a/mgmt/Main.cc b/mgmt/Main.cc
index a5c4483..9eb6904 100644
--- a/mgmt/Main.cc
+++ b/mgmt/Main.cc
@@ -722,9 +722,17 @@ main(int argc, char **argv)
// Now that we know our cluster ip address, add the
// UI record for this machine
overviewGenerator->addSelfRecord();
+
+ lmgmt->listenForProxy();
+
+ //
+ // As listenForProxy() may change/restore euid, we should put
+ // the creation of webIntr_main thread after it. So that we
+ // can keep a consistent euid when create mgmtapi/eventapi unix
+ // sockets in webIntr_main thread.
+ //
webThrId = ink_thread_create(webIntr_main, NULL); /* Spin web agent thread */
Debug("lm", "Created Web Agent thread (%" PRId64 ")", (int64_t)webThrId);
- lmgmt->listenForProxy();
ticker = time(NULL);
mgmt_log("[TrafficManager] Setup complete\n");