You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2015/05/04 05:27:08 UTC
[1/2] trafficserver git commit: Fixed not waiting on fragments in
missing frag table.
Repository: trafficserver
Updated Branches:
refs/heads/ts-974-5-3-x f82fd704f -> 2fb305bb4
Fixed not waiting on fragments in missing frag table.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e92da07f
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e92da07f
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e92da07f
Branch: refs/heads/ts-974-5-3-x
Commit: e92da07f8d2ddd16ad9250ee3ef08d9c1b09f4b7
Parents: f82fd70
Author: Alan M. Carroll <so...@yahoo-inc.com>
Authored: Sun May 3 08:48:50 2015 -0500
Committer: Alan M. Carroll <so...@yahoo-inc.com>
Committed: Sun May 3 08:48:50 2015 -0500
----------------------------------------------------------------------
iocore/cache/CacheDir.cc | 9 +++---
iocore/cache/CacheHttp.cc | 14 ++++++---
iocore/cache/CacheRead.cc | 68 +++++++++--------------------------------
iocore/cache/CacheWrite.cc | 4 +--
iocore/cache/P_CacheDir.h | 4 ++-
iocore/cache/P_CacheHttp.h | 12 +++++++-
proxy/http/HttpCacheSM.cc | 5 +++
proxy/http/HttpSM.cc | 3 ++
8 files changed, 51 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/CacheDir.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheDir.cc b/iocore/cache/CacheDir.cc
index 9bac5d8..038e6f0 100644
--- a/iocore/cache/CacheDir.cc
+++ b/iocore/cache/CacheDir.cc
@@ -143,7 +143,7 @@ OpenDir::close_write(CacheVC *cont)
int b = h % OPEN_DIR_BUCKETS;
bucket[b].remove(cont->od);
// delayed_readers.append(cont->od->readers);
- signal_readers(0, 0);
+// signal_readers(0, 0);
cont->od->vector.clear();
cont->od->mutex = 0;
THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding);
@@ -216,12 +216,11 @@ OpenDirEntry::key_for(CacheKey const& alt_key, int64_t offset)
return vector.key_for(alt_key, offset);
}
-OpenDirEntry&
-OpenDirEntry::waiting_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+bool
+OpenDirEntry::wait_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
{
Debug("amc", "vc %p waiting for %" PRId64, vc, offset);
- vector.waiting_for(alt_key, vc, offset);
- return *this;
+ return vector.wait_for(alt_key, vc, offset);
}
OpenDirEntry&
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/CacheHttp.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheHttp.cc b/iocore/cache/CacheHttp.cc
index 97837ee..6078d22 100644
--- a/iocore/cache/CacheHttp.cc
+++ b/iocore/cache/CacheHttp.cc
@@ -315,15 +315,19 @@ CacheHTTPInfoVector::is_write_active(CacheKey const& alt_key, int64_t offset)
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
-CacheHTTPInfoVector&
-CacheHTTPInfoVector::waiting_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+bool
+CacheHTTPInfoVector::wait_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
{
+ bool zret = true;
int alt_idx = this->index_of(alt_key);
Item& item = data[alt_idx];
int frag_idx = item._alternate.get_frag_index_of(offset);
- vc->fragment = frag_idx;
- item._waiting.push(vc);
- return *this;
+ vc->fragment = frag_idx; // really? Shouldn't this already be set?
+ if (item.has_writers())
+ item._waiting.push(vc);
+ else
+ zret = false;
+ return zret;
}
/*-------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index 8c49076..726c834 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -354,6 +354,7 @@ CacheVC::openReadFromWriter(int event, Event *e)
if (write_vc && CACHE_ALT_INDEX_DEFAULT != (alternate_index = get_alternate_index(&(od->vector), write_vc->earliest_key))) {
alternate.copy_shallow(od->vector.get(alternate_index));
key = earliest_key = alternate.object_key_get();
+ doc_len = alternate.object_size_get();
MUTEX_RELEASE(lock_od);
SET_HANDLER(&CacheVC::openReadStartEarliest);
return openReadStartEarliest(event, e);
@@ -661,36 +662,13 @@ LreadMain:
void
CacheVC::update_key_to_frag_idx(int target)
{
- FragmentDescriptorTable& frags = (*alternate.get_frag_table());
-
if (0 == target) {
fragment = 0;
key = earliest_key;
- } else if (! frags[target].m_key.is_zero()) {
- key = frags[target].m_key;
- } else { // step through the hard way
- if (target < fragment) { // going backwards
- if (target < (fragment - target)) { // faster to go from start
- fragment = 0;
- key = earliest_key;
- } else { // quicker to go from here to there
- while (target < fragment) {
- prev_CacheKey(&key, &key);
- --fragment;
- if (frags[fragment].m_key.is_zero())
- frags[fragment].m_key = key; // might as well store it as we go.
- else ink_assert(frags[fragment].m_key == key);
- }
- }
- }
- // advance to target if we're not already there.
- while (target > fragment) {
- next_CacheKey(&key, &key);
- ++fragment;
- if (frags[fragment].m_key.is_zero())
- frags[fragment].m_key = key; // might as well store it as we go.
- else ink_assert(frags[fragment].m_key == key);
- }
+ } else {
+ FragmentDescriptor* frag = alternate.force_frag_at(target);
+ ink_assert(frag);
+ key = frag->m_key;
}
}
@@ -779,7 +757,7 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
int n_frags = alternate.get_frag_count();
// Quick check for offset in next fragment - very common
- if (target_offset >= frag_upper_bound && (!frags || fragment > n_frags || target_offset < (*frags)[fragment].m_offset)) {
+ if (target_offset >= frag_upper_bound && (!frags || fragment >= n_frags || target_offset <= (*frags)[fragment].m_offset)) {
Debug("amc", "Non-seeking continuation to next fragment");
} else {
int target = -1; // target fragment index.
@@ -824,32 +802,16 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
return handleEvent(AIO_EVENT_DONE, 0);
}
return EVENT_CONT;
- } else if (write_vc) {
- ink_release_assert(! "[amc] Handle case where fragment is not in the directory during read");
- /* Check for an OpenDirEntry - if none, no hope. If so, see if any of the writers in it are still
- planning to write this fragment (might check against all fragments - if any aren't going to
- be written, might just give up at that point rather than hope some other user agent will happen
- to ask for the missing fragments. Although, if there's a herd, that might not be so far fetched).
- */
-# if 0
- if (writer_done()) {
- last_collision = NULL;
- while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
- if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d",
- this, first_key.slice32(1), (int)vio.ndone);
- doc_len = vio.ndone;
- goto Leos;
- }
- }
+ } else {
+ if (!od->wait_for(earliest_key, this, target_offset)) {
DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d",
this, first_key.slice32(1), (int)vio.ndone);
- goto Lerror;
+ lock.release();
+ return calluser(VC_EVENT_ERROR);
}
- DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
+ DDebug("cache_read_agg", "%p: key: %X ReadMain waiting: %d", this, first_key.slice32(1), (int)vio.ndone);
SET_HANDLER(&CacheVC::openReadMain);
- VC_SCHED_WRITER_RETRY();
-# endif
+ return EVENT_CONT;
}
if (is_action_tag_set("cache"))
ink_release_assert(false);
@@ -883,9 +845,7 @@ CacheVC::openReadWaitEarliest(int evid, Event*)
od = NULL;
key = first_key;
return handleEvent(EVENT_IMMEDIATE, 0);
- } else if (dir_probe(&key, vol, &earliest_dir, &last_collision) ||
- dir_lookaside_probe(&key, vol, &earliest_dir, NULL))
- {
+ } else if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, NULL)) {
dir = earliest_dir;
SET_HANDLER(&self::openReadStartEarliest);
if ((zret = do_read_call(&key)) == EVENT_RETURN) {
@@ -985,7 +945,7 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
// It's OK if there's a writer for this alternate, we can wait on it.
if (od && od->has_writer(earliest_key)) {
wake_up_thread = mutex->thread_holding;
- od->waiting_for(earliest_key, this, 0);
+ od->wait_for(earliest_key, this, 0);
lock.release();
// The SM must be signaled that the cache read is open even if we haven't got the earliest frag
// yet because otherwise it won't set up the read side of the tunnel before the write side finishes
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/CacheWrite.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index f6ff4d4..7d6e405 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -1516,11 +1516,11 @@ Lagain:
Debug("amc", "[CacheVC::openWriteMain] writing empty earliest");
} else {
// go on the wait list
- od->waiting_for(earliest_key, this, 0);
+ od->wait_for(earliest_key, this, 0);
not_writing = true;
}
} else if (od->is_write_active(earliest_key, write_pos)) {
- od->waiting_for(earliest_key, this, write_pos);
+ od->wait_for(earliest_key, this, write_pos);
not_writing = true;
} else if (alternate.is_frag_cached(fragment)) {
not_writing = true;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/P_CacheDir.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h
index 0571150..10487fa 100644
--- a/iocore/cache/P_CacheDir.h
+++ b/iocore/cache/P_CacheDir.h
@@ -300,8 +300,10 @@ struct OpenDirEntry {
/// Get the fragment key for a specific @a offset.
CacheKey const &key_for(CacheKey const &alt_key, int64_t offset);
/** Wait for a fragment to be written.
+
+ @return @c false if there is no writer that is scheduled to write that fragment.
*/
- self &waiting_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+ bool wait_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
/// Close out anything related to this writer
self &close_writer(CacheKey const &alt_key, CacheVC *vc);
};
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/iocore/cache/P_CacheHttp.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheHttp.h b/iocore/cache/P_CacheHttp.h
index 54fb523..458fb9e 100644
--- a/iocore/cache/P_CacheHttp.h
+++ b/iocore/cache/P_CacheHttp.h
@@ -91,6 +91,9 @@ struct CacheHTTPInfoVector {
} f;
};
///@}
+ /// Check if there are any writers.
+ /// @internal Need to augment this at some point to check for writers to a specific offset.
+ bool has_writers() const;
};
typedef CacheArray<Item> InfoVector;
@@ -135,7 +138,8 @@ struct CacheHTTPInfoVector {
/// Indicate if a VC is currently writing to the fragment with this @a offset.
bool is_write_active(CacheKey const &alt_key, int64_t offset);
/// Mark a CacheVC as waiting for the fragment containing the byte at @a offset.
- self &waiting_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+ /// @return @c false if there is no writer scheduled to write that offset.
+ bool wait_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
/// Get the fragment key for a specific @a offset.
CacheKey const &key_for(CacheKey const &alt_key, int64_t offset);
/// Close out anything related to this writer
@@ -256,6 +260,12 @@ protected:
bool _pending_range_shift_p;
};
+TS_INLINE bool
+CacheHTTPInfoVector::Item::has_writers() const
+{
+ return NULL != _writers.head;
+}
+
TS_INLINE CacheHTTPInfo *
CacheHTTPInfoVector::get(int idx)
{
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/proxy/http/HttpCacheSM.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpCacheSM.cc b/proxy/http/HttpCacheSM.cc
index c1dc8ba..67a3447 100644
--- a/proxy/http/HttpCacheSM.cc
+++ b/proxy/http/HttpCacheSM.cc
@@ -277,6 +277,11 @@ HttpCacheSM::open_partial_read(HTTPHdr* client_request_hdr)
// Simple because this requires an active write VC so we know the object is there (no retries).
ink_assert(NULL != cache_write_vc);
+ // If this is a partial fill there will be a cache read VC. Resetting it to be used is challenging
+ // because it requires digging in to the internals of the VC or expanding its interface. At present
+ // it's better to just close it and re-open one that we know is valid with regard to the write VC.
+ this->close_read();
+
SET_HANDLER(&HttpCacheSM::state_cache_open_partial_read);
open_read_cb = false;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e92da07f/proxy/http/HttpSM.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index e2e0b95..9b9238c 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -1547,6 +1547,9 @@ HttpSM::handle_api_return()
t_state.source = HttpTransact::SOURCE_CACHE;
HTTP_SM_SET_DEFAULT_HANDLER(&HttpSM::state_cache_open_partial_read);
cache_sm.cache_write_vc = save_write_vc;
+ // Close the read VC if it's there because it's less work than trying to reset the existing
+ // one (which doesn't have the ODE attached).
+ cache_sm.close_read();
pending_action = cache_sm.open_partial_read(&t_state.hdr_info.client_request);
cache_sm.cache_write_vc = NULL;
} else {
[2/2] trafficserver git commit: Fixed problem with ODE not being
cleared on cache read VCs.
Posted by am...@apache.org.
Fixed problem with ODE not being cleared on cache read VCs.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/2fb305bb
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/2fb305bb
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/2fb305bb
Branch: refs/heads/ts-974-5-3-x
Commit: 2fb305bb48355cace90c984faa0505fa2b93bc9d
Parents: e92da07
Author: Alan M. Carroll <so...@yahoo-inc.com>
Authored: Sun May 3 22:26:17 2015 -0500
Committer: Alan M. Carroll <so...@yahoo-inc.com>
Committed: Sun May 3 22:26:17 2015 -0500
----------------------------------------------------------------------
iocore/cache/Cache.cc | 3 +-
iocore/cache/CacheDir.cc | 76 +++++++++++++++----------------------
iocore/cache/CacheRead.cc | 7 ++--
iocore/cache/CacheVol.cc | 5 ++-
iocore/cache/CacheWrite.cc | 20 ++++++----
iocore/cache/P_CacheDir.h | 18 ++++-----
iocore/cache/P_CacheInternal.h | 22 +++++------
iocore/cache/P_CacheVol.h | 10 ++---
8 files changed, 74 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index c685c11..c5d8d58 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -582,6 +582,7 @@ Vol::close_read(CacheVC *cont)
EThread *t = cont->mutex->thread_holding;
ink_assert(t == this_ethread());
ink_assert(t == mutex->thread_holding);
+ open_dir.close_entry(cont);
if (dir_is_empty(&cont->earliest_dir))
return 1;
int i = dir_evac_bucket(&cont->earliest_dir);
@@ -2883,7 +2884,7 @@ CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
goto Lfree;
}
if (!f.remove_aborted_writers) {
- if (vol->open_write(this, true, 1)) {
+ if (vol->open_write(this)) {
// writer exists
ink_release_assert(od = vol->open_read(&key));
od->dont_update_directory = 1;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/CacheDir.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheDir.cc b/iocore/cache/CacheDir.cc
index 038e6f0..6eedc46 100644
--- a/iocore/cache/CacheDir.cc
+++ b/iocore/cache/CacheDir.cc
@@ -61,51 +61,39 @@ OpenDir::OpenDir()
SET_HANDLER(&OpenDir::signal_readers);
}
-/*
- If allow_if_writers is false, open_write fails if there are other writers.
- max_writers sets the maximum number of concurrent writers that are
- allowed. Only The first writer can set the max_writers. It is ignored
- for later writers.
- Returns 1 on success and 0 on failure.
- */
-int
-OpenDir::open_write(CacheVC *cont, int /* allow_if_writers */, int /* max_writers */)
+OpenDirEntry*
+OpenDir::open_entry(Vol* vol, CryptoHash const& key, bool force_p)
{
- ink_assert(cont->vol->mutex->thread_holding == this_ethread());
- unsigned int h = cont->first_key.slice32(0);
+ ink_assert(vol->mutex->thread_holding == this_ethread());
+ unsigned int h = key.slice32(0);
int b = h % OPEN_DIR_BUCKETS;
for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) {
- if (!(d->first_key == cont->first_key))
+ if (!(d->first_key == key))
continue;
-// if (allow_if_writers && d->num_writers < d->max_writers) {
-// d->writers.push(cont);
- // [amc] Need to think if we want to track writers per object and not just per alt.
- // useful to know when to close out the OpenDirEntry.
- d->num_writers++;
- cont->od = d;
- cont->write_vector = &d->vector;
- return 1;
-// }
- return 0;
+ ++(d->num_active);
+// cont->od = d;
+// cont->write_vector = &d->vector;
+ return d;
}
+
+ if (!force_p)
+ return NULL;
+
OpenDirEntry *od = THREAD_ALLOC(openDirEntryAllocator,
- cont->mutex->thread_holding);
-// od->readers.head = NULL;
-// od->writers.push(cont);
+ vol->mutex->thread_holding);
od->mutex = new_ProxyMutex();
- od->first_key = cont->first_key;
- od->num_writers = 1;
-// od->max_writers = max_writers;
+ od->first_key = key;
+ od->num_active = 1;
od->vector.data.data = &od->vector.data.fast_data[0];
od->dont_update_directory = 0;
od->move_resident_alt = 0;
od->reading_vec = 0;
od->writing_vec = 0;
dir_clear(&od->first_dir);
- cont->od = od;
- cont->write_vector = &od->vector;
+// cont->od = od;
+// cont->write_vector = &od->vector;
bucket[b].push(od);
- return 1;
+ return od;
}
int
@@ -133,25 +121,22 @@ OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
return 0;
}
-int
-OpenDir::close_write(CacheVC *cont)
+void
+OpenDir::close_entry(CacheVC* vc)
{
- ink_assert(cont->vol->mutex->thread_holding == this_ethread());
-// cont->od->writers.remove(cont);
- if (--(cont->od->num_writers) < 1) {
- unsigned int h = cont->first_key.slice32(0);
+ ink_assert(vc->vol->mutex->thread_holding == this_ethread());
+ if (--(vc->od->num_active) < 1) {
+ unsigned int h = vc->od->first_key.slice32(0);
int b = h % OPEN_DIR_BUCKETS;
- bucket[b].remove(cont->od);
-// delayed_readers.append(cont->od->readers);
-// signal_readers(0, 0);
- cont->od->vector.clear();
- cont->od->mutex = 0;
- THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding);
+ bucket[b].remove(vc->od);
+ vc->od->vector.clear();
+ vc->od->mutex = 0;
+ THREAD_FREE(vc->od, openDirEntryAllocator, vc->vol->mutex->thread_holding);
}
- cont->od = NULL;
- return 0;
+ vc->od = NULL;
}
+# if 0
OpenDirEntry *
OpenDir::open_read(INK_MD5 *key)
{
@@ -163,7 +148,6 @@ OpenDir::open_read(INK_MD5 *key)
return NULL;
}
-# if 0
int
OpenDirEntry::wait(CacheVC *cont, int msec)
{
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index 726c834..55cd34c 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -290,9 +290,9 @@ CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSE
alternate_index = 0;
vector.clear(false);
DDebug("cache_read_agg",
- "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p",
+ "%p: key: %X eKey: %d # alts: %d, ndx: %d, # active: %d writer: %p",
this, first_key.slice32(1), write_vc->earliest_key.slice32(1),
- vector.count(), alternate_index, od->num_writers, write_vc);
+ vector.count(), alternate_index, od->num_active, write_vc);
}
#endif // HTTP_CACHE
return EVENT_NONE;
@@ -956,7 +956,8 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
return callcont(CACHE_EVENT_OPEN_READ); // must signal read is open
} else if (frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
- if (!vol->open_write(this, false, 1)) {
+ ink_release_assert(!"[amc] Not handling multiple writers with vector evacuate");
+ if (!vol->open_write(this)) {
Doc *doc1 = (Doc *)first_buf->data();
uint32_t len = this->load_http_info(write_vector, doc1);
ink_assert(len == doc1->hlen && write_vector->count() > 0);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/CacheVol.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index 47f5775..857027e 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -413,8 +413,9 @@ CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
}
Debug("cache_scan", "trying for writer lock");
- if (vol->open_write(this, false, 1)) {
- writer_lock_retry++;
+ if (vol->open_write(this)) {
+ // [amc] This tried to restrict to one writer, must fix at some point.
+ ++writer_lock_retry;
SET_HANDLER(&CacheVC::scanOpenWrite);
mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
return EVENT_CONT;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/CacheWrite.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 7d6e405..20d6bdb 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -107,7 +107,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
alternate_index = write_vector->insert(&alternate, alternate_index);
}
- if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
+ if (od->move_resident_alt && first_buf._ptr() /* && !od->has_multiple_writers() */) {
Doc *doc = (Doc *)first_buf->data();
int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
int have_res_alt = doc->key == od->single_doc_key;
@@ -1606,7 +1606,7 @@ CacheVC::openWriteStartDone(int event, Event *e)
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- if (_action.cancelled && (!od || !od->has_multiple_writers()))
+ if (_action.cancelled && (!od /* || !od->has_multiple_writers() */ ))
goto Lcancel;
if (event == AIO_EVENT_DONE) { // vector read done
@@ -1671,15 +1671,17 @@ CacheVC::openWriteStartDone(int event, Event *e)
}
Lcollision:
- int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
+// int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
if (!od) {
- if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+ if ((err = vol->open_write(this)) > 0)
goto Lfailure;
+/*
if (od->has_multiple_writers()) {
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openWriteInit);
return this->openWriteInit(EVENT_IMMEDIATE, 0);
}
+*/
}
// check for collision
if (dir_probe(&first_key, vol, &dir, &last_collision)) {
@@ -1724,7 +1726,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
cancel_trigger();
if (_action.cancelled)
return free_CacheVC(this);
- if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
+ if (((err = vol->open_write_lock(this)) > 0)) {
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
free_CacheVC(this);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
@@ -1784,7 +1786,7 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, in
c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
c->pin_in_cache = (uint32_t)apin_in_cache;
- if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
+ if ((res = c->vol->open_write_lock(c)) > 0) {
// document currently being written, abort
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
@@ -1838,7 +1840,7 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t
ink_assert(caches[type] == this);
intptr_t err = 0;
- int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
+// int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
CacheVC *c = new_CacheVC(cont);
ProxyMutex *mutex = cont->mutex;
c->vio.op = VIO::WRITE;
@@ -1902,13 +1904,15 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t
{
CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
if (lock.is_locked()) {
- if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+ if ((err = c->vol->open_write(c)) > 0)
goto Lfailure;
// If there are multiple writers, then this one cannot be an update.
// Only the first writer can do an update. If that's the case, we can
// return success to the state machine now.;
+/*
if (c->od->has_multiple_writers())
goto Lmiss;
+*/
if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
if (c->f.update) {
// fail update because vector has been GC'd
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/P_CacheDir.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h
index 10487fa..1020764 100644
--- a/iocore/cache/P_CacheDir.h
+++ b/iocore/cache/P_CacheDir.h
@@ -254,7 +254,7 @@ struct OpenDirEntry {
Dir single_doc_dir; // Directory for the resident alternate
Dir first_dir; // Dir for the vector. If empty, a new dir is
// inserted, otherwise this dir is overwritten
- uint16_t num_writers; // num of current writers
+ uint16_t num_active; // num of VCs working with this entry
uint16_t max_writers; // max number of simultaneous writers allowed
bool dont_update_directory; // if set, the first_dir is not updated.
bool move_resident_alt; // if set, single_doc_dir is inserted.
@@ -280,12 +280,6 @@ struct OpenDirEntry {
// int wait(CacheVC *c, int msec);
- bool
- has_multiple_writers()
- {
- return num_writers > 1;
- }
-
/// Get the alternate index for the @a key.
int index_of(CacheKey const &key);
/// Check if there are any writers for the alternate of @a alt_key.
@@ -314,9 +308,13 @@ struct OpenDir : public Continuation {
DLL<OpenDirEntry> bucket[OPEN_DIR_BUCKETS];
- int open_write(CacheVC *c, int allow_if_writers, int max_writers);
- int close_write(CacheVC *c);
- OpenDirEntry *open_read(CryptoHash *key);
+ /** Open a live directory entry for @a vc.
+
+ @a force_p is set to @c true to force the entry if it's not already there.
+ */
+ OpenDirEntry* open_entry(Vol* vol, CryptoHash const& key, bool force_p = false);
+ void close_entry(CacheVC *c);
+ // OpenDirEntry *open_read(CryptoHash *key);
int signal_readers(int event, Event *e);
OpenDir();
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/P_CacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 5290248..d3678d2 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -848,7 +848,7 @@ CacheVC::writer_done()
}
# endif
-TS_INLINE int
+TS_INLINE void
Vol::close_write(CacheVC *cont)
{
#ifdef CACHE_STAT_PAGES
@@ -856,12 +856,12 @@ Vol::close_write(CacheVC *cont)
stat_cache_vcs.remove(cont, cont->stat_link);
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
#endif
- return open_dir.close_write(cont);
+ open_dir.close_entry(cont);
}
// Returns 0 on success or a positive error code on failure
TS_INLINE int
-Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
+Vol::open_write(CacheVC *cont)
{
Vol *vol = this;
bool agg_error = false;
@@ -881,7 +881,8 @@ Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
m_result->notMigrate = true;
}
#endif
- if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
+ ink_assert(NULL == cont->od);
+ if (NULL != (cont->od = open_dir.open_entry(this, cont->first_key, true))) {
#ifdef CACHE_STAT_PAGES
ink_assert(cont->mutex->thread_holding == this_ethread());
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
@@ -899,26 +900,23 @@ Vol::close_write_lock(CacheVC *cont)
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked())
return -1;
- return close_write(cont);
+ this->close_write(cont);
+ return 0;
}
TS_INLINE int
-Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
+Vol::open_write_lock(CacheVC *cont)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
- if (!lock.is_locked())
- return -1;
- return open_write(cont, allow_if_writers, max_writers);
+ return lock.is_locked() ? this->open_write(cont) : -1;
}
TS_INLINE OpenDirEntry *
Vol::open_read_lock(INK_MD5 *key, EThread *t)
{
CACHE_TRY_LOCK(lock, mutex, t);
- if (!lock.is_locked())
- return NULL;
- return open_dir.open_read(key);
+ return lock.is_locked() ? open_dir.open_entry(this, *key, false) : NULL;
}
TS_INLINE int
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/2fb305bb/iocore/cache/P_CacheVol.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheVol.h b/iocore/cache/P_CacheVol.h
index 5545992..f1e014f 100644
--- a/iocore/cache/P_CacheVol.h
+++ b/iocore/cache/P_CacheVol.h
@@ -536,10 +536,10 @@ struct Vol : public Continuation {
int recover_data();
- int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
- int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
- int close_write(CacheVC *cont);
- int close_write_lock(CacheVC *cont);
+ int open_write(CacheVC *cont);
+ int open_write_lock(CacheVC *cont);
+ void close_write(CacheVC *cont);
+ int close_write_lock(CacheVC *cont); // can fail lock
int begin_read(CacheVC *cont);
int begin_read_lock(CacheVC *cont);
// unused read-write interlock code
@@ -867,7 +867,7 @@ free_EvacuationBlock(EvacuationBlock *b, EThread *t)
TS_INLINE OpenDirEntry *
Vol::open_read(CryptoHash *key)
{
- return open_dir.open_read(key);
+ return open_dir.open_entry(this, *key, false);
}
TS_INLINE int