You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2016/05/11 20:31:50 UTC
[trafficserver] 26/33: TS-4425: Switch iocore/cluster over to
Ptr::get().
This is an automated email from the ASF dual-hosted git repository.
jpeach pushed a commit to branch master
in repository https://git-dual.apache.org/repos/asf/trafficserver.git
commit 32f84f1834c9b7e32c18b84b2b67c7f0181501f5
Author: James Peach <jp...@apache.org>
AuthorDate: Sat May 7 13:21:16 2016 -0700
TS-4425: Switch iocore/cluster over to Ptr::get().
---
iocore/cluster/ClusterCache.cc | 8 ++++----
iocore/cluster/ClusterHandler.cc | 36 ++++++++++++++++-----------------
iocore/cluster/ClusterHandlerBase.cc | 10 ++++-----
iocore/cluster/ClusterLib.cc | 16 ++++++++-------
iocore/cluster/ClusterLoadMonitor.cc | 2 +-
iocore/cluster/ClusterProcessor.cc | 3 +--
iocore/cluster/ClusterVConnection.cc | 2 +-
iocore/cluster/P_ClusterCache.h | 6 +++---
iocore/cluster/P_ClusterCacheInternal.h | 8 ++++----
iocore/cluster/P_ClusterHandler.h | 18 +++++++++--------
iocore/cluster/P_ClusterInternal.h | 20 ++++++++++++++++++
11 files changed, 76 insertions(+), 53 deletions(-)
diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc
index bf3b659..04213c9 100644
--- a/iocore/cluster/ClusterCache.cc
+++ b/iocore/cluster/ClusterCache.cc
@@ -1616,7 +1616,7 @@ CacheContinuation::replyOpEvent(int event, VConnection *cvc)
msg->token = token; // Tell sender conn established
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
- pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
+ pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex.get());
read_cluster_vc->allow_remote_close();
results_expected--;
}
@@ -1688,7 +1688,7 @@ CacheContinuation::replyOpEvent(int event, VConnection *cvc)
// Transmit reply message and object data in same cluster message
Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64, seq_number,
readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
- clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data,
+ clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data.get(),
cluster_vc_channel, &token, &CacheContinuation::disposeOfDataBuffer, (void *)this,
CLUSTER_OPT_STEAL);
} else {
@@ -1975,7 +1975,7 @@ cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
c->freeMsgBuffer();
if (ci.valid()) {
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
- c->setMsgBufferLen(len, iob);
+ c->setMsgBufferLen(len, iob.get());
c->ic_new_info = ci;
}
msg->seq_number = len; // HACK ALERT: reusing variable
@@ -1996,7 +1996,7 @@ cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
c->token = msg->token;
if (ci.valid()) {
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
- c->setMsgBufferLen(len, iob);
+ c->setMsgBufferLen(len, iob.get());
c->ic_new_info = ci;
}
c->result_error = op_result_error;
diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc
index da07b4d..57e9ab1 100644
--- a/iocore/cluster/ClusterHandler.cc
+++ b/iocore/cluster/ClusterHandler.cc
@@ -369,11 +369,11 @@ ClusterHandler::close_free_lock(ClusterVConnection *vc, ClusterVConnState *s)
{
Ptr<ProxyMutex> m(s->vio.mutex);
if (s == &vc->read) {
- if ((ProxyMutex *)vc->read_locked)
+ if (vc->read_locked)
MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
vc->read_locked = NULL;
} else {
- if ((ProxyMutex *)vc->write_locked)
+ if (vc->write_locked)
MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
vc->write_locked = NULL;
}
@@ -569,7 +569,7 @@ ClusterHandler::build_initial_vector(bool read_flag)
/////////////////////////////////////
// Try to get the read VIO mutex
/////////////////////////////////////
- ink_release_assert(!(ProxyMutex *)vc->read_locked);
+ ink_release_assert(!vc->read_locked);
#ifdef CLUSTER_TOMCAT
if (!vc->read.vio.mutex ||
!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
@@ -617,7 +617,7 @@ ClusterHandler::build_initial_vector(bool read_flag)
bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block);
// Sanity check, assert we have the lock
if (!remote_write_fill) {
- ink_assert((ProxyMutex *)vc->write_locked);
+ ink_assert(vc->write_locked);
}
if (vc_ok_write(vc) || remote_write_fill) {
if (remote_write_fill) {
@@ -633,9 +633,10 @@ ClusterHandler::build_initial_vector(bool read_flag)
vc->write_list_bytes -= (int)s.msg.descriptor[i].length;
vc->write_bytes_in_transit += (int)s.msg.descriptor[i].length;
- vc->write_list_tail = vc->write_list;
- while (vc->write_list_tail && vc->write_list_tail->next)
- vc->write_list_tail = vc->write_list_tail->next;
+ vc->write_list_tail = vc->write_list.get();
+ while (vc->write_list_tail && vc->write_list_tail->next) {
+ vc->write_list_tail = vc->write_list_tail->next.get();
+ }
}
} else {
Debug(CL_NOTE, "faking cluster write data");
@@ -761,7 +762,7 @@ ClusterHandler::get_read_locks()
continue;
}
- ink_assert(!(ProxyMutex *)vc->read_locked);
+ ink_assert(!vc->read_locked);
vc->read_locked = vc->read.vio.mutex;
if (vc->byte_bank_q.head ||
!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
@@ -815,7 +816,7 @@ ClusterHandler::get_write_locks()
// already have a reference to the buffer
continue;
}
- ink_assert(!(ProxyMutex *)vc->write_locked);
+ ink_assert(!vc->write_locked);
vc->write_locked = vc->write.vio.mutex;
#ifdef CLUSTER_TOMCAT
if (vc->write_locked &&
@@ -1110,7 +1111,7 @@ ClusterHandler::update_channels_read()
continue;
}
- if (!vc->pending_remote_fill && vc_ok_read(vc) && (!((ProxyMutex *)vc->read_locked) || vc->byte_bank_q.head)) {
+ if (!vc->pending_remote_fill && vc_ok_read(vc) && (!vc->read_locked || vc->byte_bank_q.head)) {
//
// Byte bank active or unable to acquire lock on VC.
// Move data into the byte bank and attempt delivery
@@ -1120,7 +1121,7 @@ ClusterHandler::update_channels_read()
add_to_byte_bank(vc);
} else {
- if (vc->pending_remote_fill || ((ProxyMutex *)vc->read_locked && vc_ok_read(vc))) {
+ if (vc->pending_remote_fill || (vc->read_locked && vc_ok_read(vc))) {
vc->read_block->fill(len); // note bytes received
if (!vc->pending_remote_fill) {
vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
@@ -1146,9 +1147,8 @@ ClusterHandler::update_channels_read()
// for message processing which cannot be done with a ET_CLUSTER thread.
//
int
-ClusterHandler::process_incoming_callouts(ProxyMutex *m)
+ClusterHandler::process_incoming_callouts(ProxyMutex *mutex)
{
- ProxyMutex *mutex = m;
ink_hrtime now;
//
// Atomically dequeue all active requests from the external queue and
@@ -1308,7 +1308,7 @@ ClusterHandler::update_channels_partial_read()
vc->read_block->fill(len); // note bytes received
if (!vc->pending_remote_fill) {
- if ((ProxyMutex *)vc->read_locked) {
+ if (vc->read_locked) {
Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len);
s->vio.buffer.writer()->append_block(vc->read_block->clone());
if (complete_channel_read(len, vc)) {
@@ -1358,7 +1358,7 @@ ClusterHandler::complete_channel_read(int len, ClusterVConnection *vc)
if (vc->closed)
return false; // No action if already closed
- ink_assert((ProxyMutex *)s->vio.mutex == (ProxyMutex *)s->vio._cont->mutex);
+ ink_assert(s->vio.mutex == s->vio._cont->mutex);
Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len);
s->vio.ndone += len;
@@ -2316,12 +2316,12 @@ ClusterHandler::free_locks(bool read_flag, int i)
ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
if (VALID_CHANNEL(vc)) {
if (read_flag) {
- if ((ProxyMutex *)vc->read_locked) {
+ if (vc->read_locked) {
MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
vc->read_locked = NULL;
}
} else {
- if ((ProxyMutex *)vc->write_locked) {
+ if (vc->write_locked) {
MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
vc->write_locked = NULL;
}
@@ -2331,7 +2331,7 @@ ClusterHandler::free_locks(bool read_flag, int i)
s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
if (VALID_CHANNEL(vc)) {
- if ((ProxyMutex *)vc->read_locked) {
+ if (vc->read_locked) {
MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
vc->read_locked = NULL;
}
diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc
index a396f7c..cb7064c 100644
--- a/iocore/cluster/ClusterHandlerBase.cc
+++ b/iocore/cluster/ClusterHandlerBase.cc
@@ -53,7 +53,7 @@ ClusterCalloutContinuation::~ClusterCalloutContinuation()
int
ClusterCalloutContinuation::CalloutHandler(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
- return _ch->process_incoming_callouts(this->mutex);
+ return _ch->process_incoming_callouts(this->mutex.get());
}
/*************************************************************************/
@@ -67,8 +67,7 @@ ClusterControl::ClusterControl()
void
ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
+ ProxyMutex *mutex = this_ethread()->mutex;
ink_assert(!data);
if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) {
@@ -316,11 +315,12 @@ ClusterState::build_do_io_vector()
if (last_block) {
last_block->next = block[n];
}
- last_block = block[n];
+ last_block = block[n].get();
while (last_block->next) {
- last_block = last_block->next;
+ last_block = last_block->next.get();
}
}
+
mbuf->_writer = block[0];
ink_release_assert(bytes_to_xfer == to_do);
ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel));
diff --git a/iocore/cluster/ClusterLib.cc b/iocore/cluster/ClusterLib.cc
index cd24208..e46e56f 100644
--- a/iocore/cluster/ClusterLib.cc
+++ b/iocore/cluster/ClusterLib.cc
@@ -104,13 +104,13 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock **
while (bsrc && nbytes) {
// Skip zero length blocks
if (!bsrc->read_avail()) {
- bsrc = bsrc->next;
+ bsrc = bsrc->next.get();
continue;
}
if (bclone_head) {
bclone->next = bsrc->clone();
- bclone = bclone->next;
+ bclone = bclone->next.get();
} else {
// Skip bytes already processed
if (bytes_to_skip) {
@@ -125,7 +125,7 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock **
} else {
// Skip entire block
- bsrc = bsrc->next;
+ bsrc = bsrc->next.get();
continue;
}
} else {
@@ -140,7 +140,7 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock **
bclone->fill(nbytes);
nbytes = 0;
}
- bsrc = bsrc->next;
+ bsrc = bsrc->next.get();
}
ink_release_assert(!nbytes);
*b_tail = bclone;
@@ -167,14 +167,15 @@ consume_IOBufferBlockList(IOBufferBlock *b, int64_t n)
} else {
// Consumed entire block
- b_remainder = b->next;
+ b_remainder = b->next.get();
}
break;
} else {
- b = b->next;
+ b = b->next.get();
}
}
+
ink_release_assert(nbytes == 0);
return b_remainder; // return remaining blocks
}
@@ -191,8 +192,9 @@ bytes_IOBufferBlockList(IOBufferBlock *b, int64_t read_avail_bytes)
} else {
n += b->write_avail();
}
- b = b->next;
+ b = b->next.get();
}
+
return n;
}
diff --git a/iocore/cluster/ClusterLoadMonitor.cc b/iocore/cluster/ClusterLoadMonitor.cc
index 42af4cf..de3157d 100644
--- a/iocore/cluster/ClusterLoadMonitor.cc
+++ b/iocore/cluster/ClusterLoadMonitor.cc
@@ -218,7 +218,7 @@ void
ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
{
#ifdef CLUSTER_TOMCAT
- ProxyMutex *mutex = this->ch->mutex; // hack for stats
+ ProxyMutex *mutex = this->ch->mutex.get(); // hack for stats
#endif
CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc
index e8655b6..699d09c 100644
--- a/iocore/cluster/ClusterProcessor.cc
+++ b/iocore/cluster/ClusterProcessor.cc
@@ -53,8 +53,7 @@ ClusterProcessor::~ClusterProcessor()
int
ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options, void *cmsg)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
+ ProxyMutex *mutex = this_ethread()->mutex;
//
// RPC facility for intercluster communication available to other
// subsystems.
diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc
index 1fa0f64..f6e97d0 100644
--- a/iocore/cluster/ClusterVConnection.cc
+++ b/iocore/cluster/ClusterVConnection.cc
@@ -31,7 +31,7 @@ ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator");
ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator");
ByteBankDescriptor *
-ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock *iob)
+ByteBankDescriptor::ByteBankDescriptor_alloc(Ptr<IOBufferBlock> &iob)
{
ByteBankDescriptor *b = byteBankAllocator.alloc();
b->block = iob;
diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h
index 06b6afa..f774bb5 100644
--- a/iocore/cluster/P_ClusterCache.h
+++ b/iocore/cluster/P_ClusterCache.h
@@ -454,12 +454,12 @@ class ByteBankDescriptor
public:
ByteBankDescriptor() {}
IOBufferBlock *
- get_block()
+ get_block() const
{
- return block;
+ return block.get();
}
- static ByteBankDescriptor *ByteBankDescriptor_alloc(IOBufferBlock *);
+ static ByteBankDescriptor *ByteBankDescriptor_alloc(Ptr<IOBufferBlock> &);
static void ByteBankDescriptor_free(ByteBankDescriptor *);
public:
diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h
index b0b4d15..4437972 100644
--- a/iocore/cluster/P_ClusterCacheInternal.h
+++ b/iocore/cluster/P_ClusterCacheInternal.h
@@ -195,7 +195,7 @@ struct CacheContinuation : public Continuation {
inline void
setMsgBufferLen(int l, IOBufferData *b = 0)
{
- ink_assert(rw_buf_msg == 0);
+ ink_assert(!rw_buf_msg);
ink_assert(rw_buf_msg_len == 0);
rw_buf_msg = b;
@@ -211,7 +211,7 @@ struct CacheContinuation : public Continuation {
inline void
allocMsgBuffer()
{
- ink_assert(rw_buf_msg == 0);
+ ink_assert(!rw_buf_msg);
ink_assert(rw_buf_msg_len);
if (rw_buf_msg_len <= DEFAULT_MAX_BUFFER_SIZE) {
rw_buf_msg = new_IOBufferData(buffer_size_to_index(rw_buf_msg_len, MAX_BUFFER_SIZE_INDEX));
@@ -228,9 +228,9 @@ struct CacheContinuation : public Continuation {
}
inline IOBufferData *
- getMsgBufferIOBData()
+ getMsgBufferIOBData() const
{
- return rw_buf_msg;
+ return rw_buf_msg.get();
}
inline void
diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h
index edf2d49..5dc169a 100644
--- a/iocore/cluster/P_ClusterHandler.h
+++ b/iocore/cluster/P_ClusterHandler.h
@@ -52,17 +52,19 @@ struct ClusterControl : public Continuation {
Ptr<IOBufferBlock> iob_block;
IOBufferBlock *
- get_block()
+ get_block() const
{
- return iob_block;
+ return iob_block.get();
}
+
bool
- fast_data()
+ fast_data() const
{
return (len <= MAX_FAST_CONTROL_MESSAGE);
}
+
bool
- valid_alloc_data()
+ valid_alloc_data() const
{
return iob_block && real_data && data;
}
@@ -240,9 +242,9 @@ struct ClusterMsg {
}
IOBufferBlock *
- get_block()
+ get_block() const
{
- return iob_descriptor_block;
+ return iob_descriptor_block.get();
}
IOBufferBlock *
@@ -255,7 +257,7 @@ struct ClusterMsg {
iob_descriptor_block->next = 0;
iob_descriptor_block->fill(start_offset);
iob_descriptor_block->consume(start_offset);
- return iob_descriptor_block;
+ return iob_descriptor_block.get();
}
IOBufferBlock *
@@ -268,7 +270,7 @@ struct ClusterMsg {
iob_descriptor_block->next = 0;
iob_descriptor_block->fill(start_offset);
iob_descriptor_block->consume(start_offset);
- return iob_descriptor_block;
+ return iob_descriptor_block.get();
}
void
diff --git a/iocore/cluster/P_ClusterInternal.h b/iocore/cluster/P_ClusterInternal.h
index 844934f..1011eb0 100644
--- a/iocore/cluster/P_ClusterInternal.h
+++ b/iocore/cluster/P_ClusterInternal.h
@@ -493,9 +493,29 @@ extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, Clus
extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
+
+static inline IOBufferBlock *
+clone_IOBufferBlockList(Ptr<IOBufferBlock> &src, int start_off, int b, IOBufferBlock **b_tail)
+{
+ return clone_IOBufferBlockList(src.get(), start_off, b, b_tail);
+}
+
extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
+
+static inline IOBufferBlock *
+consume_IOBufferBlockList(Ptr<IOBufferBlock> &b, int64_t n)
+{
+ return consume_IOBufferBlockList(b.get(), n);
+}
+
extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
+static inline int64_t
+bytes_IOBufferBlockList(Ptr<IOBufferBlock> &b, int64_t read_avail_bytes)
+{
+ return bytes_IOBufferBlockList(b.get(), read_avail_bytes);
+}
+
// ClusterVConnection declarations
extern void clusterVCAllocator_free(ClusterVConnection *vc);
extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
--
To stop receiving notification emails like this one, please contact
"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>.