You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ja...@apache.org on 2016/06/28 00:18:18 UTC

[trafficserver] branch master updated: TS-4331: Major re-write of hostdb (#653)

This is an automated email from the ASF dual-hosted git repository.

jacksontj pushed a commit to branch master
in repository https://git-dual.apache.org/repos/asf/trafficserver.git

The following commit(s) were added to refs/heads/master by this push:
       new  53f7579   TS-4331: Major re-write of hostdb (#653)
53f7579 is described below

commit 53f75794c6911aba93fe39903bb39a346e3ef3d8
Author: Thomas Jackson <ja...@gmail.com>
AuthorDate: Mon Jun 27 17:18:12 2016 -0700

    TS-4331: Major re-write of hostdb (#653)
    
    * Consolidate SRV `alive` checks to use the `alive` method on `HostDBInfo`
    
    * TS-4331 Replace MultiCache with RefCountCache
    
    * Misc cleanup to hostdb before cleanup
    
    Before doing the rewrite of hostdb (TS-4331) this commit cleans up someo f the method names etc. to make this subsystem less confusing
---
 doc/admin-guide/files/records.config.en.rst |   30 +-
 iocore/eventsystem/I_Event.h                |    2 +-
 iocore/hostdb/HostDB.cc                     | 1102 +++++++++------------
 iocore/hostdb/I_HostDBProcessor.h           |  199 ++--
 iocore/hostdb/Makefile.am                   |   46 +-
 iocore/hostdb/MultiCache.cc                 | 1413 ---------------------------
 iocore/hostdb/P_HostDB.h                    |    4 +-
 iocore/hostdb/P_HostDBProcessor.h           |   83 +-
 iocore/hostdb/P_MultiCache.h                |  705 -------------
 iocore/hostdb/P_RefCountCache.h             |  863 ++++++++++++++++
 iocore/hostdb/RefCountCache.cc              |   47 +
 iocore/hostdb/test_RefCountCache.cc         |  270 +++++
 lib/ts/I_Version.h                          |    8 +-
 lib/ts/Map.h                                |    2 +
 lib/ts/Ptr.h                                |    4 +-
 mgmt/RecordsConfig.cc                       |   12 +-
 proxy/EventName.cc                          |    6 +-
 proxy/Main.cc                               |    3 +-
 proxy/http/HttpSM.cc                        |   11 +-
 proxy/http/HttpTransact.h                   |    7 +-
 20 files changed, 1857 insertions(+), 2960 deletions(-)

diff --git a/doc/admin-guide/files/records.config.en.rst b/doc/admin-guide/files/records.config.en.rst
index 0b0bc5b..f1e9298 100644
--- a/doc/admin-guide/files/records.config.en.rst
+++ b/doc/admin-guide/files/records.config.en.rst
@@ -2179,16 +2179,16 @@ HostDB
 
    If not set then stale records are not served.
 
-.. ts:cv:: CONFIG proxy.config.hostdb.storage_size INT 33554432
+.. ts:cv:: CONFIG proxy.config.hostdb.storage_size INT 1073741824
    :metric: bytes
 
-   The amount of space (in bytes) used to store ``hostdb``.
-   The value of this variable must be increased if you increase the size of the
-   `proxy.config.hostdb.size`_ variable.
+   The maximum amount of space (in bytes) allocated to ``hostdb``.
+   Setting this value to ``-1`` will disable size limit enforcement.
 
-.. ts:cv:: CONFIG proxy.config.hostdb.size INT 120000
+.. ts:cv:: CONFIG proxy.config.hostdb.max_count INT -1
 
-   The maximum number of entries that can be stored in the database.
+   The maximum number of entries that can be stored in hostdb. A value of ``-1``
+   disables item count limit enforcement.
 
 .. note::
 
@@ -2257,6 +2257,12 @@ value will be used on the next check and the file will be treated as modified.
 
 The file is checked every this many seconds to see if it has changed. If so the HostDB is updated with the new values in the file.
 
+.. ts:cv:: CONFIG proxy.config.hostdb.partitions INT 64
+
+   The number of partitions for hostdb. If you are seeing lock contention within
+   hostdb's cache (due to a large number of records) you can increase the number
+   of partitions
+
 .. ts:cv:: CONFIG proxy.config.hostdb.ip_resolve STRING NULL
 
    Set the host resolution style.
@@ -2314,6 +2320,14 @@ This value is a global default that can be overridden by :ts:cv:`proxy.config.ht
    origin server is determined by the client, which forces the IP address family of the address used for the origin
    server. In effect, outbound transparent connections always use a resolution style of "``client``".
 
+.. ts:cv:: CONFIG proxy.config.hostdb.verify_after INT 720
+
+    Set the interval (in seconds) in which to re-query DNS regardless of TTL status.
+
+.. ts:cv:: CONFIG proxy.config.hostdb.filename STRING "host.db"
+
+   The filename to persist hostdb to on disk.
+
 .. ts:cv:: CONFIG proxy.config.cache.hostdb.sync_frequency INT 120
 
    Set the frequency (in seconds) to sync hostdb to disk.
@@ -2321,10 +2335,6 @@ This value is a global default that can be overridden by :ts:cv:`proxy.config.ht
    Note: hostdb is syncd to disk on a per-partition basis (of which there are 64).
    This means that the minumum time to sync all data to disk is :ts:cv:`proxy.config.cache.hostdb.sync_frequency` * 64
 
-.. ts:cv:: CONFIG proxy.config.hostdb.verify_after INT 720
-
-    Set the interval (in seconds) in which to re-query DNS regardless of TTL status.
-
 Logging Configuration
 =====================
 
diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h
index f546f42..a73eed7 100644
--- a/iocore/eventsystem/I_Event.h
+++ b/iocore/eventsystem/I_Event.h
@@ -62,7 +62,7 @@
 #define DNS_EVENT_EVENTS_START 600
 #define CONFIG_EVENT_EVENTS_START 800
 #define LOG_EVENT_EVENTS_START 900
-#define MULTI_CACHE_EVENT_EVENTS_START 1000
+#define REFCOUNT_CACHE_EVENT_EVENTS_START 1000
 #define CACHE_EVENT_EVENTS_START 1100
 #define CACHE_DIRECTORY_EVENT_EVENTS_START 1200
 #define CACHE_DB_EVENT_EVENTS_START 1300
diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc
index 20421b7..237a0b2 100644
--- a/iocore/hostdb/HostDB.cc
+++ b/iocore/hostdb/HostDB.cc
@@ -63,7 +63,7 @@ static ink_time_t hostdb_last_interval = 0;
 // Epoch timestamp when we updated the hosts file last.
 static ink_time_t hostdb_hostfile_update_timestamp = 0;
 static char hostdb_filename[PATH_NAME_MAX]         = DEFAULT_HOST_DB_FILENAME;
-int hostdb_size                                    = DEFAULT_HOST_DB_SIZE;
+int hostdb_max_count                               = DEFAULT_HOST_DB_SIZE;
 char hostdb_hostfile_path[PATH_NAME_MAX]           = "";
 int hostdb_sync_frequency                          = 120;
 int hostdb_srv_enabled                             = 0;
@@ -77,24 +77,14 @@ HostDBCache hostDB;
 
 void ParseHostFile(char const *path, unsigned int interval);
 
-static Queue<HostDBContinuation> remoteHostDBQueue[MULTI_CACHE_PARTITIONS];
-
 char *
-HostDBInfo::srvname(HostDBRoundRobin *rr)
+HostDBInfo::srvname(HostDBRoundRobin *rr) const
 {
   if (!is_srv || !data.srv.srv_offset)
     return NULL;
-  ink_assert(this - rr->info >= 0 && this - rr->info < rr->rrcount && data.srv.srv_offset < rr->length);
   return (char *)rr + data.srv.srv_offset;
 }
 
-static inline int
-corrupt_debugging_callout(HostDBInfo *e, RebuildMC &r)
-{
-  Debug("hostdb", "corrupt %ld part %d", (long)((char *)&e->app.rr.offset - r.data), r.partition);
-  return -1;
-}
-
 static inline bool
 is_addr_valid(uint8_t af, ///< Address family (format of data)
               void *ptr   ///< Raw address data (not a sockaddr variant!)
@@ -236,62 +226,9 @@ HostDBMD5::~HostDBMD5()
     SplitDNSConfig::release(pSD);
 }
 
-HostDBCache::HostDBCache()
+HostDBCache::HostDBCache() : refcountcache(NULL), pending_dns(NULL), remoteHostDBQueue(NULL)
 {
-  tag_bits          = HOST_DB_TAG_BITS;
-  max_hits          = (1 << HOST_DB_HITS_BITS) - 1;
-  version.ink_major = HOST_DB_CACHE_MAJOR_VERSION;
-  version.ink_minor = HOST_DB_CACHE_MINOR_VERSION;
-  hosts_file_ptr    = new RefCountedHostsFileMap();
-}
-
-int
-HostDBCache::rebuild_callout(HostDBInfo *e, RebuildMC &r)
-{
-  if (e->round_robin && e->reverse_dns)
-    return corrupt_debugging_callout(e, r);
-  if (e->reverse_dns) {
-    if (e->data.hostname_offset < 0)
-      return 0;
-    if (e->data.hostname_offset > 0) {
-      if (!valid_offset(e->data.hostname_offset - 1))
-        return corrupt_debugging_callout(e, r);
-      char *p = (char *)ptr(&e->data.hostname_offset, r.partition);
-      if (!p)
-        return corrupt_debugging_callout(e, r);
-      char *s = p;
-      while (*p && p - s < MAXDNAME) {
-        if (!valid_heap_pointer(p))
-          return corrupt_debugging_callout(e, r);
-        p++;
-      }
-      if (p - s >= MAXDNAME)
-        return corrupt_debugging_callout(e, r);
-    }
-  }
-  if (e->round_robin) {
-    if (e->app.rr.offset < 0)
-      return 0;
-    if (!valid_offset(e->app.rr.offset - 1))
-      return corrupt_debugging_callout(e, r);
-    HostDBRoundRobin *rr = (HostDBRoundRobin *)ptr(&e->app.rr.offset, r.partition);
-    if (!rr)
-      return corrupt_debugging_callout(e, r);
-    if (rr->rrcount > HOST_DB_MAX_ROUND_ROBIN_INFO || rr->rrcount <= 0 || rr->good > HOST_DB_MAX_ROUND_ROBIN_INFO ||
-        rr->good <= 0 || rr->good > rr->rrcount)
-      return corrupt_debugging_callout(e, r);
-    for (int i = 0; i < rr->good; i++) {
-      if (!valid_heap_pointer(((char *)&rr->info[i + 1]) - 1))
-        return -1;
-      if (!ats_is_ip(rr->info[i].ip()))
-        return corrupt_debugging_callout(e, r);
-      if (rr->info[i].md5_high != e->md5_high || rr->info[i].md5_low != e->md5_low || rr->info[i].md5_low_low != e->md5_low_low)
-        return corrupt_debugging_callout(e, r);
-    }
-  }
-  if (e->is_ip_timeout())
-    return 0;
-  return 1;
+  hosts_file_ptr = new RefCountedHostsFileMap();
 }
 
 HostDBCache *
@@ -300,102 +237,27 @@ HostDBProcessor::cache()
   return &hostDB;
 }
 
-struct HostDBTestRR : public Continuation {
-  int fd;
-  char b[512];
-  int nb;
-  int outstanding, success, failure;
-  int in;
-
-  int
-  mainEvent(int event, Event *e)
-  {
-    if (event == EVENT_INTERVAL) {
-      printf("HostDBTestRR: %d outstanding %d succcess %d failure\n", outstanding, success, failure);
-    }
-    if (event == EVENT_HOST_DB_LOOKUP) {
-      --outstanding;
-      if (e)
-        ++success;
-      else
-        ++failure;
-    }
-    if (in)
-      return EVENT_CONT;
-    in = 1;
-    while (outstanding < 40) {
-      if (!nb)
-        goto Lreturn;
-      char *end = (char *)memchr(b, '\n', nb);
-      if (!end)
-        read_some();
-      end = (char *)memchr(b, '\n', nb);
-      if (!end)
-        nb = 0;
-      else {
-        *end = 0;
-        outstanding++;
-        hostDBProcessor.getbyname_re(this, b, 0);
-        nb -= ((end + 1) - b);
-        memcpy(b, end + 1, nb);
-        if (!nb)
-          read_some();
-      }
-    }
-  Lreturn:
-    in = 0;
-    return EVENT_CONT;
-  }
-
-  void
-  read_some()
-  {
-    nb = read(fd, b + nb, 512 - nb);
-    ink_release_assert(nb >= 0);
-  }
-
-  HostDBTestRR() : Continuation(new_ProxyMutex()), nb(0), outstanding(0), success(0), failure(0), in(0)
-  {
-    printf("starting HostDBTestRR....\n");
-    fd = open("hostdb_test.config", O_RDONLY, 0);
-    ink_release_assert(fd >= 0);
-    read_some();
-    SET_HANDLER(&HostDBTestRR::mainEvent);
-  }
-
-  ~HostDBTestRR() { close(fd); }
-};
-
-struct HostDBSyncer : public Continuation {
+struct HostDBBackgroundTask : public Continuation {
   int frequency;
   ink_hrtime start_time;
 
-  int sync_event(int event, void *edata);
+  virtual int sync_event(int event, void *edata) = 0;
   int wait_event(int event, void *edata);
 
-  HostDBSyncer();
+  HostDBBackgroundTask(int frequency);
 };
 
-HostDBSyncer::HostDBSyncer() : Continuation(new_ProxyMutex()), frequency(0), start_time(0)
+HostDBBackgroundTask::HostDBBackgroundTask(int frequency) : Continuation(new_ProxyMutex()), frequency(frequency), start_time(0)
 {
-  SET_HANDLER(&HostDBSyncer::sync_event);
+  SET_HANDLER(&HostDBBackgroundTask::sync_event);
 }
 
 int
-HostDBSyncer::sync_event(int, void *)
+HostDBBackgroundTask::wait_event(int, void *)
 {
-  SET_HANDLER(&HostDBSyncer::wait_event);
-  start_time = Thread::get_hrtime();
-  hostDBProcessor.cache()->sync_partitions(this);
-  return EVENT_DONE;
-}
-
-int
-HostDBSyncer::wait_event(int, void *)
-{
-  ink_hrtime next_sync = HRTIME_SECONDS(hostdb_sync_frequency) - (Thread::get_hrtime() - start_time);
+  ink_hrtime next_sync = HRTIME_SECONDS(this->frequency) - (Thread::get_hrtime() - start_time);
 
-  SET_HANDLER(&HostDBSyncer::sync_event);
+  SET_HANDLER(&HostDBBackgroundTask::sync_event);
   if (next_sync > HRTIME_MSECONDS(100))
     eventProcessor.schedule_in(this, next_sync, ET_TASK);
   else
@@ -403,16 +265,30 @@ HostDBSyncer::wait_event(int, void *)
   return EVENT_DONE;
 }
 
+struct HostDBSync : public HostDBBackgroundTask {
+  std::string storage_path;
+  std::string full_path;
+  HostDBSync(int frequency, std::string storage_path, std::string full_path)
+    : HostDBBackgroundTask(frequency), storage_path(storage_path), full_path(full_path){};
+  int
+  sync_event(int, void *)
+  {
+    SET_HANDLER(&HostDBSync::wait_event);
+    start_time = Thread::get_hrtime();
+
+    new RefCountCacheSerializer<RefCountCache<HostDBInfo>>(this, hostDBProcessor.cache()->refcountcache, this->frequency,
+                                                           this->storage_path, this->full_path);
+    return EVENT_DONE;
+  }
+};
+
 int
 HostDBCache::start(int flags)
 {
-  Store *hostDBStore;
-  Span *hostDBSpan;
+  (void)flags; // unused
   char storage_path[PATH_NAME_MAX];
-  int storage_size = 33554432; // 32MB default
-
-  bool reconfigure = ((flags & PROCESSOR_RECONFIGURE) ? true : false);
-  bool fix         = ((flags & PROCESSOR_FIX) ? true : false);
+  MgmtInt hostdb_max_size = 0;
+  int hostdb_partitions   = 64;
 
   storage_path[0] = '\0';
 
@@ -420,11 +296,22 @@ HostDBCache::start(int flags)
   // Command line overrides manager configuration.
   //
   REC_ReadConfigInt32(hostdb_enable, "proxy.config.hostdb");
-  REC_ReadConfigString(hostdb_filename, "proxy.config.hostdb.filename", sizeof(hostdb_filename));
-  REC_ReadConfigInt32(hostdb_size, "proxy.config.hostdb.size");
   REC_ReadConfigInt32(hostdb_srv_enabled, "proxy.config.srv_enabled");
   REC_ReadConfigString(storage_path, "proxy.config.hostdb.storage_path", sizeof(storage_path));
-  REC_ReadConfigInt32(storage_size, "proxy.config.hostdb.storage_size");
+  REC_ReadConfigString(hostdb_filename, "proxy.config.hostdb.filename", sizeof(hostdb_filename));
+
+  // Max number of items
+  REC_ReadConfigInt32(hostdb_max_count, "proxy.config.hostdb.max_count");
+  // max size allowed to use
+  REC_ReadConfigInteger(hostdb_max_size, "proxy.config.hostdb.max_size");
+  // number of partitions
+  REC_ReadConfigInt32(hostdb_partitions, "proxy.config.hostdb.partitions");
+  // how often to sync hostdb to disk
+  REC_EstablishStaticConfigInt32(hostdb_sync_frequency, "proxy.config.cache.hostdb.sync_frequency");
+
+  if (hostdb_max_size == 0) {
+    Fatal("proxy.config.hostdb.max_size must be a non-zero number");
+  }
 
   // If proxy.config.hostdb.storage_path is not set, use the local state dir. If it is set to
   // a relative path, make it relative to the prefix.
@@ -442,37 +329,29 @@ HostDBCache::start(int flags)
     Warning("Please set 'proxy.config.hostdb.storage_path' or 'proxy.config.local_state_dir'");
   }
 
-  hostDBStore = new Store;
-  hostDBSpan  = new Span;
-  hostDBSpan->init(storage_path, storage_size);
-  hostDBStore->add(hostDBSpan);
-
-  Debug("hostdb", "Opening %s, size=%d", hostdb_filename, hostdb_size);
-  if (open(hostDBStore, "hostdb.config", hostdb_filename, hostdb_size, reconfigure, fix, false /* slient */) < 0) {
-    ats_scoped_str rundir(RecConfigReadRuntimeDir());
-    ats_scoped_str config(Layout::relative_to(rundir, "hostdb.config"));
-
-    Note("reconfiguring host database");
-
-    if (unlink(config) < 0)
-      Debug("hostdb", "unable to unlink %s", (const char *)config);
+  // Combine the path and name
+  char full_path[2 * PATH_NAME_MAX];
+  ink_filepath_make(full_path, 2 * PATH_NAME_MAX, storage_path, hostdb_filename);
 
-    delete hostDBStore;
-    hostDBStore = new Store;
-    hostDBSpan  = new Span;
-    hostDBSpan->init(storage_path, storage_size);
-    hostDBStore->add(hostDBSpan);
+  Debug("hostdb", "Opening %s, partitions=%d storage_size=%" PRIu64 " items=%d", full_path, hostdb_partitions, hostdb_max_size,
+        hostdb_max_count);
+  this->refcountcache = new RefCountCache<HostDBInfo>(hostdb_partitions, hostdb_max_size, hostdb_max_count, HostDBInfo::version(),
+                                                      "proxy.process.hostdb.cache.");
+  int load_ret = LoadRefCountCacheFromPath<HostDBInfo>(*this->refcountcache, storage_path, full_path, HostDBInfo::unmarshall);
+  if (load_ret != 0) {
+    Warning("Error loading cache from %s: %d", full_path, load_ret);
+    printf("error loading %d\n", load_ret);
+  }
 
-    if (open(hostDBStore, "hostdb.config", hostdb_filename, hostdb_size, true, fix) < 0) {
-      Warning("could not initialize host database. Host database will be disabled");
-      hostdb_enable = 0;
-      delete hostDBStore;
-      return -1;
-    }
+  //
+  // Sync HostDB, if we've asked for it.
+  //
+  if (hostdb_sync_frequency > 0) {
+    eventProcessor.schedule_imm(new HostDBSync(hostdb_sync_frequency, storage_path, full_path), ET_TASK);
   }
-  HOSTDB_SET_DYN_COUNT(hostdb_bytes_stat, totalsize);
-  //  XXX I don't see this being reference in the previous function calls, so I am going to delete it -bcall
-  delete hostDBStore;
+
+  this->pending_dns       = new Queue<HostDBContinuation, Continuation::Link_link>[hostdb_partitions];
+  this->remoteHostDBQueue = new Queue<HostDBContinuation, Continuation::Link_link>[hostdb_partitions];
   return 0;
 }
 
@@ -484,15 +363,11 @@ HostDBCache::start(int flags)
 int
 HostDBProcessor::start(int, size_t)
 {
-  hostDB.alloc_mutexes();
-
   if (hostDB.start(0) < 0)
     return -1;
 
   if (auto_clear_hostdb_flag)
-    hostDB.clear();
-
-  HOSTDB_SET_DYN_COUNT(hostdb_total_entries_stat, hostDB.totalelements);
+    hostDB.refcountcache->clear();
 
   statPagesManager.register_http("hostdb", register_ShowHostDB);
 
@@ -512,7 +387,6 @@ HostDBProcessor::start(int, size_t)
   REC_EstablishStaticConfigInt32U(hostdb_ip_stale_interval, "proxy.config.hostdb.verify_after");
   REC_EstablishStaticConfigInt32U(hostdb_ip_fail_timeout_interval, "proxy.config.hostdb.fail.timeout");
   REC_EstablishStaticConfigInt32U(hostdb_serve_stale_but_revalidate, "proxy.config.hostdb.serve_stale_for");
-  REC_EstablishStaticConfigInt32(hostdb_sync_frequency, "proxy.config.cache.hostdb.sync_frequency");
   REC_EstablishStaticConfigInt32U(hostdb_hostfile_check_interval, "proxy.config.hostdb.host_file.interval");
 
   //
@@ -525,12 +399,6 @@ HostDBProcessor::start(int, size_t)
   b->mutex = new_ProxyMutex();
   eventProcessor.schedule_every(b, HRTIME_SECONDS(1), ET_DNS);
 
-  //
-  // Sync HostDB, if we've asked for it.
-  //
-  if (hostdb_sync_frequency > 0)
-    eventProcessor.schedule_imm(new HostDBSyncer, ET_TASK);
-
   return 0;
 }
 
@@ -551,7 +419,7 @@ HostDBContinuation::init(HostDBMD5 const &the_md5, Options const &opt)
 
   host_res_style     = opt.host_res_style;
   dns_lookup_timeout = opt.timeout;
-  mutex              = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+  mutex              = hostDB.refcountcache->lock_for_key(md5.hash.fold());
   if (opt.cont) {
     action = opt.cont;
   } else {
@@ -563,21 +431,21 @@ HostDBContinuation::init(HostDBMD5 const &the_md5, Options const &opt)
 void
 HostDBContinuation::refresh_MD5()
 {
-  ProxyMutex *old_bucket_mutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+  ProxyMutex *old_bucket_mutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
   // We're not pending DNS anymore.
   remove_trigger_pending_dns();
   md5.refresh();
   // Update the mutex if it's from the bucket.
   // Some call sites modify this after calling @c init so need to check.
   if (mutex.get() == old_bucket_mutex) {
-    mutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+    mutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
   }
 }
 
 static bool
 reply_to_cont(Continuation *cont, HostDBInfo *r, bool is_srv = false)
 {
-  if (r == NULL || r->is_srv != is_srv || r->failed()) {
+  if (r == NULL || r->is_srv != is_srv || r->is_failed()) {
     cont->handleEvent(is_srv ? EVENT_SRV_LOOKUP : EVENT_HOST_DB_LOOKUP, NULL);
     return false;
   }
@@ -587,7 +455,7 @@ reply_to_cont(Continuation *cont, HostDBInfo *r, bool is_srv = false)
       ink_assert(!"missing hostname");
       cont->handleEvent(is_srv ? EVENT_SRV_LOOKUP : EVENT_HOST_DB_LOOKUP, NULL);
       Warning("bogus entry deleted from HostDB: missing hostname");
-      hostDB.delete_block(r);
+      hostDB.refcountcache->erase(r->key);
       return false;
     }
     Debug("hostdb", "hostname = %s", r->hostname());
@@ -598,7 +466,7 @@ reply_to_cont(Continuation *cont, HostDBInfo *r, bool is_srv = false)
       ink_assert(!"missing round-robin");
       cont->handleEvent(is_srv ? EVENT_SRV_LOOKUP : EVENT_HOST_DB_LOOKUP, NULL);
       Warning("bogus entry deleted from HostDB: missing round-robin");
-      hostDB.delete_block(r);
+      hostDB.refcountcache->erase(r->key);
       return false;
     }
     ip_text_buffer ipb;
@@ -607,12 +475,6 @@ reply_to_cont(Continuation *cont, HostDBInfo *r, bool is_srv = false)
 
   cont->handleEvent(is_srv ? EVENT_SRV_LOOKUP : EVENT_HOST_DB_LOOKUP, r);
 
-  if (!r->full) {
-    Warning("bogus entry deleted from HostDB: none");
-    hostDB.delete_block(r);
-    return false;
-  }
-
   return true;
 }
 
@@ -651,63 +513,45 @@ db_mark_for(IpAddr const &ip)
   return ip.isIp6() ? HOSTDB_MARK_IPV6 : HOSTDB_MARK_IPV4;
 }
 
-HostDBInfo *
+Ptr<HostDBInfo>
 probe(ProxyMutex *mutex, HostDBMD5 const &md5, bool ignore_timeout)
 {
-  ink_assert(this_ethread() == hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets))->thread_holding);
-  if (hostdb_enable) {
-    uint64_t folded_md5 = fold_md5(md5.hash);
-    HostDBInfo *r       = hostDB.lookup_block(folded_md5, hostDB.levels);
-    Debug("hostdb", "probe %.*s %" PRIx64 " %d [ignore_timeout = %d]", md5.host_len, md5.host_name, folded_md5, !!r,
-          ignore_timeout);
-    if (r && md5.hash[1] == r->md5_high) {
-      // Check for timeout (fail probe)
-      //
-      if (r->is_deleted()) {
-        Debug("hostdb", "HostDB entry was set as deleted");
-        return NULL;
-      } else if (r->failed()) {
-        Debug("hostdb", "'%.*s' failed", md5.host_len, md5.host_name);
-        if (r->is_ip_fail_timeout()) {
-          Debug("hostdb", "fail timeout %u", r->ip_interval());
-          return NULL;
-        }
-      } else if (!ignore_timeout && r->is_ip_timeout() && !r->serve_stale_but_revalidate()) {
-        Debug("hostdb", "timeout %u %u %u", r->ip_interval(), r->ip_timestamp, r->ip_timeout_interval);
-        HOSTDB_INCREMENT_DYN_STAT(hostdb_ttl_expires_stat);
-        return NULL;
-      }
-      // error conditions
-      if (r->reverse_dns && !r->hostname()) {
-        Debug("hostdb", "missing reverse dns");
-        hostDB.delete_block(r);
-        return NULL;
-      }
-      if (r->round_robin && !r->rr()) {
-        Debug("hostdb", "missing round-robin");
-        hostDB.delete_block(r);
-        return NULL;
-      }
-      // Check for stale (revalidate offline if we are the owner)
-      // -or-
-      // we are beyond our TTL but we choose to serve for another N seconds [hostdb_serve_stale_but_revalidate seconds]
-      if ((!ignore_timeout && r->is_ip_stale() && !cluster_machine_at_depth(master_hash(md5.hash)) && !r->reverse_dns) ||
-          (r->is_ip_timeout() && r->serve_stale_but_revalidate())) {
-        Debug("hostdb", "stale %u %u %u, using it and refreshing it", r->ip_interval(), r->ip_timestamp, r->ip_timeout_interval);
-        HostDBContinuation *c = hostDBContAllocator.alloc();
-        HostDBContinuation::Options copt;
-        copt.host_res_style = host_res_style_for(r->ip());
-        c->init(md5, copt);
-        c->do_dns();
-      }
+  // If hostdb is disabled, don't return anything
+  if (!hostdb_enable) {
+    return Ptr<HostDBInfo>();
+  }
 
-      r->hits++;
-      if (!r->hits)
-        r->hits--;
-      return r;
-    }
+  // Otherwise HostDB is enabled, so we'll do our thing
+  ink_assert(this_ethread() == hostDB.refcountcache->lock_for_key(md5.hash.fold())->thread_holding);
+  uint64_t folded_md5 = md5.hash.fold();
+
+  // get the item from cache
+  Ptr<HostDBInfo> r = hostDB.refcountcache->get(folded_md5);
+  // If there was nothing in the cache-- this is a miss
+  if (r.get() == NULL) {
+    return r;
+  }
+
+  // If the dns response was failed, and we've hit the failed timeout, lets stop returning it
+  if (r->is_failed() && r->is_ip_fail_timeout()) {
+    return make_ptr((HostDBInfo *)NULL);
+    // if we aren't ignoring timeouts, and we are past it-- then remove the item
+  } else if (!ignore_timeout && r->is_ip_timeout() && !r->serve_stale_but_revalidate()) {
+    HOSTDB_INCREMENT_DYN_STAT(hostdb_ttl_expires_stat);
+    return make_ptr((HostDBInfo *)NULL);
   }
-  return NULL;
+
+  // If the record is stale, but we want to revalidate-- lets start that up
+  if ((!ignore_timeout && r->is_ip_stale() && !cluster_machine_at_depth(master_hash(md5.hash)) && !r->reverse_dns) ||
+      (r->is_ip_timeout() && r->serve_stale_but_revalidate())) {
+    Debug("hostdb", "stale %u %u %u, using it and refreshing it", r->ip_interval(), r->ip_timestamp, r->ip_timeout_interval);
+    HostDBContinuation *c = hostDBContAllocator.alloc();
+    HostDBContinuation::Options copt;
+    copt.host_res_style = host_res_style_for(r->ip());
+    c->init(md5, copt);
+    c->do_dns();
+  }
+  return r;
 }
 
 //
@@ -717,22 +561,20 @@ probe(ProxyMutex *mutex, HostDBMD5 const &md5, bool ignore_timeout)
 HostDBInfo *
 HostDBContinuation::insert(unsigned int attl)
 {
-  uint64_t folded_md5 = fold_md5(md5.hash);
-  int bucket          = folded_md5 % hostDB.buckets;
+  uint64_t folded_md5 = md5.hash.fold();
 
-  ink_assert(this_ethread() == hostDB.lock_for_bucket(bucket)->thread_holding);
-  // remove the old one to prevent buildup
-  HostDBInfo *old_r = hostDB.lookup_block(folded_md5, hostDB.levels);
-  if (old_r)
-    hostDB.delete_block(old_r);
-  HostDBInfo *r = hostDB.insert_block(folded_md5, NULL, 0);
-  r->md5_high   = md5.hash[1];
+  ink_assert(this_ethread() == hostDB.refcountcache->lock_for_key(folded_md5)->thread_holding);
+
+  HostDBInfo *r = HostDBInfo::alloc();
+  r->key        = folded_md5;
   if (attl > HOST_DB_MAX_TTL)
     attl                 = HOST_DB_MAX_TTL;
   r->ip_timeout_interval = attl;
   r->ip_timestamp        = hostdb_current_interval;
-  Debug("hostdb", "inserting for: %.*s: (md5: %" PRIx64 ") bucket: %d now: %u timeout: %u ttl: %u", md5.host_len, md5.host_name,
-        folded_md5, bucket, r->ip_timestamp, r->ip_timeout_interval, attl);
+  Debug("hostdb", "inserting for: %.*s: (md5: %" PRIx64 ") now: %u timeout: %u ttl: %u", md5.host_len, md5.host_name, folded_md5,
+        r->ip_timestamp, r->ip_timeout_interval, attl);
+
+  hostDB.refcountcache->put(folded_md5, r, 0, r->expiry_time());
   return r;
 }
 
@@ -774,22 +616,22 @@ HostDBProcessor::getby(Continuation *cont, const char *hostname, int len, sockad
       // find the partition lock
       //
       // TODO: Could we reuse the "mutex" above safely? I think so but not sure.
-      ProxyMutex *bmutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+      ProxyMutex *bmutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
       MUTEX_TRY_LOCK(lock, bmutex, thread);
       MUTEX_TRY_LOCK(lock2, cont->mutex, thread);
 
       if (lock.is_locked() && lock2.is_locked()) {
         // If we can get the lock and a level 1 probe succeeds, return
-        HostDBInfo *r = probe(bmutex, md5, aforce_dns);
+        Ptr<HostDBInfo> r = probe(bmutex, md5, aforce_dns);
         if (r) {
-          if (r->failed() && hostname)
+          if (r->is_failed() && hostname)
             loop = check_for_retry(md5.db_mark, host_res_style);
           if (!loop) {
             // No retry -> final result. Return it.
             Debug("hostdb", "immediate answer for %s",
                   hostname ? hostname : ats_is_ip(ip) ? ats_ip_ntop(ip, ipb, sizeof ipb) : "<null>");
             HOSTDB_INCREMENT_DYN_STAT(hostdb_total_hits_stat);
-            reply_to_cont(cont, r);
+            reply_to_cont(cont, r.get());
             return ACTION_RESULT_DONE;
           }
           md5.refresh(); // only on reloop, because we've changed the family.
@@ -898,17 +740,17 @@ HostDBProcessor::getSRVbyname_imm(Continuation *cont, process_srv_info_pfn proce
   // Attempt to find the result in-line, for level 1 hits
   if (!force_dns) {
     // find the partition lock
-    ProxyMutex *bucket_mutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+    ProxyMutex *bucket_mutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
     MUTEX_TRY_LOCK(lock, bucket_mutex, thread);
 
     // If we can get the lock and a level 1 probe succeeds, return
     if (lock.is_locked()) {
-      HostDBInfo *r = probe(bucket_mutex, md5, false);
+      Ptr<HostDBInfo> r = probe(bucket_mutex, md5, false);
       if (r) {
         Debug("hostdb", "immediate SRV answer for %s from hostdb", hostname);
         Debug("dns_srv", "immediate SRV answer for %s from hostdb", hostname);
         HOSTDB_INCREMENT_DYN_STAT(hostdb_total_hits_stat);
-        (cont->*process_srv_info)(r);
+        (cont->*process_srv_info)(r.get());
         return ACTION_RESULT_DONE;
       }
     }
@@ -972,18 +814,18 @@ HostDBProcessor::getbyname_imm(Continuation *cont, process_hostdb_info_pfn proce
     do {
       loop = false; // loop only on explicit set for retry
       // find the partition lock
-      ProxyMutex *bucket_mutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+      ProxyMutex *bucket_mutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
       SCOPED_MUTEX_LOCK(lock, bucket_mutex, thread);
       // do a level 1 probe for immediate result.
-      HostDBInfo *r = probe(bucket_mutex, md5, false);
+      Ptr<HostDBInfo> r = probe(bucket_mutex, md5, false);
       if (r) {
-        if (r->failed()) // fail, see if we should retry with alternate
+        if (r->is_failed()) // fail, see if we should retry with alternate
           loop = check_for_retry(md5.db_mark, opt.host_res_style);
         if (!loop) {
           // No retry -> final result. Return it.
           Debug("hostdb", "immediate answer for %.*s", md5.host_len, md5.host_name);
           HOSTDB_INCREMENT_DYN_STAT(hostdb_total_hits_stat);
-          (cont->*process_hostdb_info)(r);
+          (cont->*process_hostdb_info)(r.get());
           return ACTION_RESULT_DONE;
         }
         md5.refresh(); // Update for retry.
@@ -1048,19 +890,19 @@ do_setby(HostDBInfo *r, HostDBApplicationInfo *app, const char *hostname, IpAddr
     if (is_srv) {
       uint32_t key = makeHostHash(hostname);
       for (int i = 0; i < rr->rrcount; i++) {
-        if (key == rr->info[i].data.srv.key && !strcmp(hostname, rr->info[i].srvname(rr))) {
+        if (key == rr->info(i).data.srv.key && !strcmp(hostname, rr->info(i).srvname(rr))) {
           Debug("hostdb", "immediate setby for %s", hostname);
-          rr->info[i].app.allotment.application1 = app->allotment.application1;
-          rr->info[i].app.allotment.application2 = app->allotment.application2;
+          rr->info(i).app.allotment.application1 = app->allotment.application1;
+          rr->info(i).app.allotment.application2 = app->allotment.application2;
           return;
         }
       }
     } else
       for (int i = 0; i < rr->rrcount; i++) {
-        if (rr->info[i].ip() == ip) {
+        if (rr->info(i).ip() == ip) {
           Debug("hostdb", "immediate setby for %s", hostname ? hostname : "<addr>");
-          rr->info[i].app.allotment.application1 = app->allotment.application1;
-          rr->info[i].app.allotment.application2 = app->allotment.application2;
+          rr->info(i).app.allotment.application1 = app->allotment.application1;
+          rr->info(i).app.allotment.application2 = app->allotment.application2;
           return;
         }
       }
@@ -1088,14 +930,14 @@ HostDBProcessor::setby(const char *hostname, int len, sockaddr const *ip, HostDB
 
   // Attempt to find the result in-line, for level 1 hits
 
-  ProxyMutex *mutex = hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets));
+  ProxyMutex *mutex = hostDB.refcountcache->lock_for_key(md5.hash.fold());
   EThread *thread   = this_ethread();
   MUTEX_TRY_LOCK(lock, mutex, thread);
 
   if (lock.is_locked()) {
-    HostDBInfo *r = probe(mutex, md5, false);
+    Ptr<HostDBInfo> r = probe(mutex, md5, false);
     if (r)
-      do_setby(r, app, hostname, md5.ip);
+      do_setby(r.get(), app, hostname, md5.ip);
     return;
   }
   // Create a continuation to do a deaper probe in the background
@@ -1133,16 +975,16 @@ HostDBProcessor::setby_srv(const char *hostname, int len, const char *target, Ho
 int
 HostDBContinuation::setbyEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
 {
-  HostDBInfo *r = probe(mutex.get(), md5, false);
+  Ptr<HostDBInfo> r = probe(mutex.get(), md5, false);
 
   if (r)
-    do_setby(r, &app, md5.host_name, md5.ip, is_srv());
+    do_setby(r.get(), &app, md5.host_name, md5.ip, is_srv());
 
   hostdb_cont_free(this);
   return EVENT_DONE;
 }
 
-static int
+static bool
 remove_round_robin(HostDBInfo *r, const char *hostname, IpAddr const &ip)
 {
   if (r) {
@@ -1152,15 +994,15 @@ remove_round_robin(HostDBInfo *r, const char *hostname, IpAddr const &ip)
     if (!rr)
       return false;
     for (int i = 0; i < rr->good; i++) {
-      if (ip == rr->info[i].ip()) {
+      if (ip == rr->info(i).ip()) {
         ip_text_buffer b;
         Debug("hostdb", "Deleting %s from '%s' round robin DNS entry", ip.toString(b, sizeof b), hostname);
-        HostDBInfo tmp         = rr->info[i];
-        rr->info[i]            = rr->info[rr->good - 1];
-        rr->info[rr->good - 1] = tmp;
+        HostDBInfo tmp         = rr->info(i);
+        rr->info(i)            = rr->info(rr->good - 1);
+        rr->info(rr->good - 1) = tmp;
         rr->good--;
         if (rr->good <= 0) {
-          hostDB.delete_block(r);
+          hostDB.refcountcache->erase(r->key);
           return false;
         } else {
           if (is_debug_tag_set("hostdb")) {
@@ -1168,7 +1010,7 @@ remove_round_robin(HostDBInfo *r, const char *hostname, IpAddr const &ip)
             char *rr_ip_list = (char *)alloca(bufsize);
             char *p          = rr_ip_list;
             for (int n = 0; n < rr->good; ++n) {
-              ats_ip_ntop(rr->info[n].ip(), p, bufsize);
+              ats_ip_ntop(rr->info(n).ip(), p, bufsize);
               int nbytes = strlen(p);
               p += nbytes;
               bufsize -= nbytes;
@@ -1183,42 +1025,6 @@ remove_round_robin(HostDBInfo *r, const char *hostname, IpAddr const &ip)
   return false;
 }
 
-#if 0
-Action *
-HostDBProcessor::failed_connect_on_ip_for_name(Continuation * cont, sockaddr const* ip, const char *hostname, int len)
-{
-  HostDBMD5 md5;
-  md5.set_host(hostname, hostname ? (len ? len : strlen(hostname)) : 0);
-  md5.ip.assign(ip);
-  md5.port = ip ? ats_ip_port_host_order(ip) : 0;
-  md5.db_mark = db_mark_for(ip);
-  md5.refresh();
-
-  ProxyMutex *mutex = hostDB.lock_for_bucket((int) (fold_md5(md5.hash) % hostDB.buckets));
-  EThread *thread = this_ethread();
-  MUTEX_TRY_LOCK(lock, mutex, thread);
-  if (lock) {
-    if (!hostdb_enable || NULL == md5.dns_server) {
-      if (cont)
-        cont->handleEvent(EVENT_HOST_DB_IP_REMOVED, (void *) NULL);
-      return ACTION_RESULT_DONE;
-    }
-    HostDBInfo *r = probe(mutex, md5, false);
-    bool res = (remove_round_robin(r, hostname, ip) ? true : false);
-    if (cont)
-      cont->handleEvent(EVENT_HOST_DB_IP_REMOVED, res ? (void *) ip : (void *) NULL);
-    return ACTION_RESULT_DONE;
-  }
-  HostDBContinuation *c = hostDBContAllocator.alloc();
-  HostDBContinuation::Options copt;
-  copt.cont = cont;
-  c->init(md5, copt);
-  SET_CONTINUATION_HANDLER(c, (HostDBContHandler) & HostDBContinuation::removeEvent);
-  thread->schedule_in(c, MUTEX_RETRY_DELAY);
-  return &c->action;
-}
-#endif
-
 int
 HostDBContinuation::removeEvent(int /* event ATS_UNUSED */, Event *e)
 {
@@ -1234,8 +1040,8 @@ HostDBContinuation::removeEvent(int /* event ATS_UNUSED */, Event *e)
       if (cont)
         cont->handleEvent(EVENT_HOST_DB_IP_REMOVED, (void *)NULL);
     } else {
-      HostDBInfo *r = probe(mutex.get(), md5, false);
-      bool res      = (remove_round_robin(r, md5.host_name, md5.ip) ? true : false);
+      Ptr<HostDBInfo> r = probe(mutex.get(), md5, false);
+      bool res          = remove_round_robin(r.get(), md5.host_name, md5.ip);
       if (cont)
         cont->handleEvent(EVENT_HOST_DB_IP_REMOVED, res ? static_cast<void *>(&md5.ip) : static_cast<void *>(NULL));
     }
@@ -1246,13 +1052,13 @@ HostDBContinuation::removeEvent(int /* event ATS_UNUSED */, Event *e)
 
 // Lookup done, insert into the local table, return data to the
 // calling continuation or to the calling cluster node.
+// NOTE: if "i" exists it means we already allocated the space etc, just return
 //
 HostDBInfo *
-HostDBContinuation::lookup_done(IpAddr const &ip, char const *aname, bool around_robin, unsigned int ttl_seconds, SRVHosts *srv)
+HostDBContinuation::lookup_done(IpAddr const &ip, char const *aname, bool around_robin, unsigned int ttl_seconds, SRVHosts *srv,
+                                HostDBInfo *r)
 {
-  HostDBInfo *i = NULL;
-
-  ink_assert(this_ethread() == hostDB.lock_for_bucket((int)(fold_md5(md5.hash) % hostDB.buckets))->thread_holding);
+  ink_assert(this_ethread() == hostDB.refcountcache->lock_for_key(md5.hash.fold())->thread_holding);
   if (!ip.isValid() || !aname || !aname[0]) {
     if (is_byname()) {
       Debug("hostdb", "lookup_done() failed for '%.*s'", md5.host_len, md5.host_name);
@@ -1262,13 +1068,17 @@ HostDBContinuation::lookup_done(IpAddr const &ip, char const *aname, bool around
       ip_text_buffer b;
       Debug("hostdb", "failed for %s", md5.ip.toString(b, sizeof b));
     }
-    i                  = insert(hostdb_ip_fail_timeout_interval); // currently ... 0
-    i->round_robin     = false;
-    i->round_robin_elt = false;
-    i->is_srv          = is_srv();
-    i->reverse_dns     = !is_byname() && !is_srv();
+    if (r == NULL) {
+      r = insert(hostdb_ip_fail_timeout_interval); // currently ... 0
+    } else {
+      ttl_seconds = hostdb_ip_fail_timeout_interval;
+    }
+    r->round_robin     = false;
+    r->round_robin_elt = false;
+    r->is_srv          = is_srv();
+    r->reverse_dns     = !is_byname() && !is_srv();
 
-    i->set_failed();
+    r->set_failed();
   } else {
     switch (hostdb_ttl_mode) {
     default:
@@ -1295,25 +1105,27 @@ HostDBContinuation::lookup_done(IpAddr const &ip, char const *aname, bool around
     if (0 == ttl_seconds)
       ttl_seconds = 1;
 
-    i                  = insert(ttl_seconds);
-    i->round_robin_elt = false; // only true for elements explicitly added as RR elements.
+    if (r == NULL) {
+      r = insert(hostdb_ip_fail_timeout_interval); // currently ... 0
+    }
+    r->round_robin_elt = false; // only true for elements explicitly added as RR elements.
     if (is_byname()) {
       ip_text_buffer b;
       Debug("hostdb", "done %s TTL %d", ip.toString(b, sizeof b), ttl_seconds);
-      ats_ip_set(i->ip(), ip);
-      i->round_robin = around_robin;
-      i->reverse_dns = false;
+      ats_ip_set(r->ip(), ip);
+      r->round_robin = around_robin;
+      r->reverse_dns = false;
       if (md5.host_name != aname) {
         ink_strlcpy(md5_host_name_store, aname, sizeof(md5_host_name_store));
       }
-      i->is_srv = false;
+      r->is_srv = false;
     } else if (is_srv()) {
       ink_assert(srv && srv->srv_host_count > 0 && srv->srv_host_count <= 16 && around_robin);
 
-      i->data.srv.srv_offset = srv->srv_host_count;
-      i->reverse_dns         = false;
-      i->is_srv              = true;
-      i->round_robin         = around_robin;
+      r->data.srv.srv_offset = srv->srv_host_count;
+      r->reverse_dns         = false;
+      r->is_srv              = true;
+      r->round_robin         = around_robin;
 
       if (md5.host_name != aname) {
         ink_strlcpy(md5_host_name_store, aname, sizeof(md5_host_name_store));
@@ -1321,44 +1133,30 @@ HostDBContinuation::lookup_done(IpAddr const &ip, char const *aname, bool around
 
     } else {
       Debug("hostdb", "done '%s' TTL %d", aname, ttl_seconds);
-      const size_t s_size = strlen(aname) + 1;
-      void *s             = hostDB.alloc(&i->data.hostname_offset, s_size);
-      if (s) {
-        ink_strlcpy((char *)s, aname, s_size);
-        i->round_robin = false;
-        i->reverse_dns = true;
-        i->is_srv      = false;
-      } else {
-        ink_assert(!"out of room in hostdb data area");
-        Warning("out of room in hostdb for reverse DNS data");
-        hostDB.delete_block(i);
-        return NULL;
-      }
+      // TODO: check that this is right, it seems that the 2 hostnames are always the same
+      r->data.hostname_offset = r->hostname_offset;
+      // TODO: consolidate into a single "item type" field?
+      r->round_robin = false;
+      r->reverse_dns = true;
+      r->is_srv      = false;
     }
   }
 
-  if (aname) {
-    const size_t s_size = strlen(aname) + 1;
-    void *host_dest     = hostDB.alloc(&i->hostname_offset, s_size);
-    if (host_dest) {
-      ink_strlcpy((char *)host_dest, aname, s_size);
-    } else {
-      Warning("Out of room in hostdb for hostname (data area full!)");
-      hostDB.delete_block(i);
-      return NULL;
-    }
-  }
+  // Finally, set the TTL
+  r->ip_timeout_interval = ttl_seconds;
+  // set the "lookup_done" interval
+  r->ip_timestamp = hostdb_current_interval;
 
   if (from_cont)
-    do_put_response(from, i, from_cont);
-  ink_assert(!i->round_robin || !i->reverse_dns);
-  return i;
+    do_put_response(from, r, from_cont);
+  ink_assert(!r->round_robin || !r->reverse_dns);
+  return r;
 }
 
 int
 HostDBContinuation::dnsPendingEvent(int event, Event *e)
 {
-  ink_assert(this_ethread() == hostDB.lock_for_bucket(fold_md5(md5.hash) % hostDB.buckets)->thread_holding);
+  ink_assert(this_ethread() == hostDB.refcountcache->lock_for_key(md5.hash.fold())->thread_holding);
   if (timeout) {
     timeout->cancel(this);
     timeout = NULL;
@@ -1381,13 +1179,14 @@ HostDBContinuation::dnsPendingEvent(int event, Event *e)
   }
 }
 
+// for a new HostDBInfo `r`, "inherit" from the old version of yourself if it exists in `old_rr_data`
 static int
 restore_info(HostDBInfo *r, HostDBInfo *old_r, HostDBInfo &old_info, HostDBRoundRobin *old_rr_data)
 {
   if (old_rr_data) {
     for (int j = 0; j < old_rr_data->rrcount; j++)
-      if (ats_ip_addr_eq(old_rr_data->info[j].ip(), r->ip())) {
-        r->app = old_rr_data->info[j].app;
+      if (ats_ip_addr_eq(old_rr_data->info(j).ip(), r->ip())) {
+        r->app = old_rr_data->info(j).app;
         return true;
       }
   } else if (old_r)
@@ -1403,7 +1202,7 @@ restore_info(HostDBInfo *r, HostDBInfo *old_r, HostDBInfo &old_info, HostDBRound
 int
 HostDBContinuation::dnsEvent(int event, HostEnt *e)
 {
-  ink_assert(this_ethread() == hostDB.lock_for_bucket(fold_md5(md5.hash) % hostDB.buckets)->thread_holding);
+  ink_assert(this_ethread() == hostDB.refcountcache->lock_for_key(md5.hash.fold())->thread_holding);
   if (timeout) {
     timeout->cancel(this);
     timeout = NULL;
@@ -1447,61 +1246,78 @@ HostDBContinuation::dnsEvent(int event, HostEnt *e)
     ttl             = failed ? 0 : e->ttl / 60;
     int ttl_seconds = failed ? 0 : e->ttl; // ebalsa: moving to second accuracy
 
-    HostDBInfo *old_r = probe(mutex.get(), md5, true);
+    Ptr<HostDBInfo> old_r = probe(mutex.get(), md5, false);
     HostDBInfo old_info;
     if (old_r)
-      old_info                    = *old_r;
+      old_info                    = *old_r.get();
     HostDBRoundRobin *old_rr_data = old_r ? old_r->rr() : NULL;
-#ifdef DEBUG
-    if (old_rr_data) {
-      for (int i = 0; i < old_rr_data->rrcount; ++i) {
-        if (old_r->md5_high != old_rr_data->info[i].md5_high || old_r->md5_low != old_rr_data->info[i].md5_low ||
-            old_r->md5_low_low != old_rr_data->info[i].md5_low_low)
-          ink_assert(0);
-      }
-    }
-#endif
-    int n = 0, nn = 0;
-    void *first = 0;
-    uint8_t af  = e ? e->ent.h_addrtype : AF_UNSPEC; // address family
+    int valid_records             = 0;
+    void *first_record            = 0;
+    uint8_t af                    = e ? e->ent.h_addrtype : AF_UNSPEC; // address family
+    // if this is an RR response, we need to find the first record, as well as the
+    // total number of records
     if (is_rr) {
       if (is_srv() && !failed) {
-        n = e->srv_hosts.srv_host_count;
+        valid_records = e->srv_hosts.srv_host_count;
       } else {
         void *ptr; // tmp for current entry.
-        for (; nn < HOST_DB_MAX_ROUND_ROBIN_INFO && 0 != (ptr = e->ent.h_addr_list[nn]); ++nn) {
+        for (int total_records = 0; total_records < HOST_DB_MAX_ROUND_ROBIN_INFO && 0 != (ptr = e->ent.h_addr_list[total_records]);
+             ++total_records) {
           if (is_addr_valid(af, ptr)) {
-            if (!first)
-              first = ptr;
-            ++n;
+            if (!first_record) {
+              first_record = ptr;
+            }
+            // If we have found some records which are invalid, lets just shuffle around them.
+            // This way we'll end up with e->ent.h_addr_list with all the valid responses at
+            // the first `valid_records` slots
+            if (valid_records != total_records) {
+              e->ent.h_addr_list[valid_records] = e->ent.h_addr_list[total_records];
+            }
+
+            ++valid_records;
           } else {
             Warning("Zero address removed from round-robin list for '%s'", md5.host_name);
           }
-          // what's the point of @a n? Should there be something like
-          // if (n != nn) e->ent.h_addr_list[n] = e->ent->h_addr_list[nn];
-          // with a final copy of the terminating null? - AMC
         }
-        if (!first) {
+        if (!first_record) {
           failed = true;
           is_rr  = false;
         }
       }
     } else if (!failed) {
-      first = e->ent.h_addr_list[0];
+      first_record = e->ent.h_addr_list[0];
     } // else first is 0.
 
-    HostDBInfo *r = NULL;
     IpAddr tip; // temp storage if needed.
 
+    // In the event that the lookup failed (SOA response-- for example) we want to use md5.host_name, since it'll be ""
+    const char *aname = (failed || strlen(md5.host_name)) ? md5.host_name : e->ent.h_name;
+
+    const size_t s_size = strlen(aname) + 1;
+    const size_t rrsize = is_rr ? HostDBRoundRobin::size(valid_records, e->srv_hosts.srv_hosts_length) : 0;
+    // where in our block of memory we are
+    int offset = sizeof(HostDBInfo);
+
+    int allocSize = s_size + rrsize; // The extra space we need for the rest of the things
+
+    Debug("hostdb", "allocating %d bytes for %s with %d records", allocSize, aname, valid_records);
+    HostDBInfo *r = HostDBInfo::alloc(allocSize);
+    // set up the record
+    r->key = md5.hash.fold(); // always set the key
+
+    r->hostname_offset = offset;
+    ink_strlcpy(r->perm_hostname(), aname, s_size);
+    offset += s_size;
+
     // If the DNS lookup failed (errors such as NXDOMAIN, SERVFAIL, etc.) but we have an old record
     // which is okay with being served stale-- lets continue to serve the stale record as long as
     // the record is willing to be served.
     if (failed && old_r && old_r->serve_stale_but_revalidate()) {
-      r = old_r;
+      r = old_r.get();
     } else if (is_byname()) {
-      if (first)
-        ip_addr_set(tip, af, first);
-      r = lookup_done(tip, md5.host_name, is_rr, ttl_seconds, failed ? 0 : &e->srv_hosts);
+      if (first_record)
+        ip_addr_set(tip, af, first_record);
+      r = lookup_done(tip, md5.host_name, is_rr, ttl_seconds, failed ? 0 : &e->srv_hosts, r);
     } else if (is_srv()) {
       if (!failed)
         tip._family = AF_INET;         // force the tip valid, or else the srv will fail
@@ -1509,131 +1325,109 @@ HostDBContinuation::dnsEvent(int event, HostEnt *e)
                       md5.host_name,   /* hostname */
                       is_rr,           /* is round robin, doesnt matter for SRV since we recheck getCount() inside lookup_done() */
                       ttl_seconds,     /* ttl in seconds */
-                      failed ? 0 : &e->srv_hosts);
+                      failed ? 0 : &e->srv_hosts, r);
     } else if (failed) {
-      r = lookup_done(tip, md5.host_name, false, ttl_seconds, 0);
+      r = lookup_done(tip, md5.host_name, false, ttl_seconds, 0, r);
     } else {
-      r = lookup_done(md5.ip, e->ent.h_name, false, ttl_seconds, &e->srv_hosts);
+      r = lookup_done(md5.ip, e->ent.h_name, false, ttl_seconds, &e->srv_hosts, r);
     }
 
     // @c lookup_done should always return a valid value so @a r should be null @c NULL.
     ink_assert(r && r->app.allotment.application1 == 0 && r->app.allotment.application2 == 0);
 
+    // Conditionally make rr record entries
     if (is_rr) {
-      const int rrsize          = HostDBRoundRobin::size(n, e->srv_hosts.srv_hosts_length);
-      HostDBRoundRobin *rr_data = (HostDBRoundRobin *)hostDB.alloc(&r->app.rr.offset, rrsize);
-
-      Debug("hostdb", "allocating %d bytes for %d RR at %p %d", rrsize, n, rr_data, r->app.rr.offset);
-
-      if (rr_data) {
-        rr_data->length = rrsize;
-        int i = 0, ii = 0;
-        if (is_srv()) {
-          int skip  = 0;
-          char *pos = (char *)rr_data + sizeof(HostDBRoundRobin) + n * sizeof(HostDBInfo);
-          SRV *q[HOST_DB_MAX_ROUND_ROBIN_INFO];
-          ink_assert(n <= HOST_DB_MAX_ROUND_ROBIN_INFO);
-          // sort
-          for (i = 0; i < n; ++i) {
-            q[i] = &e->srv_hosts.hosts[i];
-          }
-          for (i = 0; i < n; ++i) {
-            for (ii = i + 1; ii < n; ++ii) {
-              if (*q[ii] < *q[i]) {
-                SRV *tmp = q[i];
-                q[i]     = q[ii];
-                q[ii]    = tmp;
-              }
+      r->app.rr.offset = offset;
+      // This will only be set if is_rr
+      HostDBRoundRobin *rr_data = (HostDBRoundRobin *)(r->rr());
+      ;
+      if (is_srv()) {
+        int skip  = 0;
+        char *pos = (char *)rr_data + sizeof(HostDBRoundRobin) + valid_records * sizeof(HostDBInfo);
+        SRV *q[HOST_DB_MAX_ROUND_ROBIN_INFO];
+        ink_assert(valid_records <= HOST_DB_MAX_ROUND_ROBIN_INFO);
+        // sort
+        for (int i = 0; i < valid_records; ++i) {
+          q[i] = &e->srv_hosts.hosts[i];
+        }
+        for (int i = 0; i < valid_records; ++i) {
+          for (int ii = i + 1; ii < valid_records; ++ii) {
+            if (*q[ii] < *q[i]) {
+              SRV *tmp = q[i];
+              q[i]     = q[ii];
+              q[ii]    = tmp;
             }
           }
+        }
 
-          for (i = 0; i < n; ++i) {
-            SRV *t           = q[i];
-            HostDBInfo &item = rr_data->info[i];
-
-            memset(&item, 0, sizeof(item));
-            item.round_robin           = 0;
-            item.round_robin_elt       = 1;
-            item.reverse_dns           = 0;
-            item.is_srv                = 1;
-            item.data.srv.srv_weight   = t->weight;
-            item.data.srv.srv_priority = t->priority;
-            item.data.srv.srv_port     = t->port;
-            item.data.srv.key          = t->key;
-
-            ink_assert((skip + t->host_len) <= e->srv_hosts.srv_hosts_length);
-
-            memcpy(pos + skip, t->host, t->host_len);
-            item.data.srv.srv_offset = (pos - (char *)rr_data) + skip;
-
-            skip += t->host_len;
-
-            item.md5_high        = r->md5_high;
-            item.md5_low         = r->md5_low;
-            item.md5_low_low     = r->md5_low_low;
-            item.full            = 1;
-            item.hostname_offset = 0;
+        rr_data->good = rr_data->rrcount = valid_records;
+        rr_data->current                 = 0;
+        for (int i = 0; i < valid_records; ++i) {
+          SRV *t                     = q[i];
+          HostDBInfo &item           = rr_data->info(i);
+          item.round_robin           = 0;
+          item.round_robin_elt       = 1;
+          item.reverse_dns           = 0;
+          item.is_srv                = 1;
+          item.data.srv.srv_weight   = t->weight;
+          item.data.srv.srv_priority = t->priority;
+          item.data.srv.srv_port     = t->port;
+          item.data.srv.key          = t->key;
+
+          ink_assert((skip + t->host_len) <= e->srv_hosts.srv_hosts_length);
+
+          memcpy(pos + skip, t->host, t->host_len);
+          item.data.srv.srv_offset = (pos - (char *)rr_data) + skip;
+
+          skip += t->host_len;
+
+          item.app.allotment.application1 = 0;
+          item.app.allotment.application2 = 0;
+          Debug("dns_srv", "inserted SRV RR record [%s] into HostDB with TTL: %d seconds", t->host, ttl_seconds);
+        }
 
-            item.app.allotment.application1 = 0;
-            item.app.allotment.application2 = 0;
-            Debug("dns_srv", "inserted SRV RR record [%s] into HostDB with TTL: %d seconds", t->host, ttl_seconds);
-          }
-          rr_data->good = rr_data->rrcount = n;
-          rr_data->current                 = 0;
-
-          // restore
-          if (old_rr_data) {
-            for (i = 0; i < rr_data->rrcount; ++i) {
-              for (ii = 0; ii < old_rr_data->rrcount; ++ii) {
-                if (rr_data->info[i].data.srv.key == old_rr_data->info[ii].data.srv.key) {
-                  char *new_host = rr_data->info[i].srvname(rr_data);
-                  char *old_host = old_rr_data->info[ii].srvname(old_rr_data);
-                  if (!strcmp(new_host, old_host))
-                    rr_data->info[i].app = old_rr_data->info[ii].app;
-                }
+        // restore
+        if (old_rr_data) {
+          for (int i = 0; i < rr_data->rrcount; ++i) {
+            for (int ii = 0; ii < old_rr_data->rrcount; ++ii) {
+              if (rr_data->info(i).data.srv.key == old_rr_data->info(ii).data.srv.key) {
+                char *new_host = rr_data->info(i).srvname(rr_data);
+                char *old_host = old_rr_data->info(ii).srvname(old_rr_data);
+                if (!strcmp(new_host, old_host))
+                  rr_data->info(i).app = old_rr_data->info(ii).app;
               }
             }
           }
-        } else {
-          for (ii = 0; ii < nn; ++ii) {
-            if (is_addr_valid(af, e->ent.h_addr_list[ii])) {
-              HostDBInfo &item = rr_data->info[i];
-              memset(&item, 0, sizeof(item));
-              ip_addr_set(item.ip(), af, e->ent.h_addr_list[ii]);
-              item.full            = 1;
-              item.round_robin     = 0;
-              item.round_robin_elt = 1;
-              item.reverse_dns     = 0;
-              item.is_srv          = 0;
-              item.md5_high        = r->md5_high;
-              item.md5_low         = r->md5_low;
-              item.md5_low_low     = r->md5_low_low;
-              item.hostname_offset = 0;
-              if (!restore_info(&item, old_r, old_info, old_rr_data)) {
-                item.app.allotment.application1 = 0;
-                item.app.allotment.application2 = 0;
-              }
-              ++i;
-            }
+        }
+      } else { // Otherwise this is a regular dns response
+        rr_data->good = rr_data->rrcount = valid_records;
+        rr_data->current                 = 0;
+        for (int i = 0; i < valid_records; ++i) {
+          HostDBInfo &item = rr_data->info(i);
+          ip_addr_set(item.ip(), af, e->ent.h_addr_list[i]);
+          item.round_robin     = 0;
+          item.round_robin_elt = 1;
+          item.reverse_dns     = 0;
+          item.is_srv          = 0;
+          if (!restore_info(&item, old_r.get(), old_info, old_rr_data)) {
+            item.app.allotment.application1 = 0;
+            item.app.allotment.application2 = 0;
           }
-          rr_data->good = rr_data->rrcount = n;
-          rr_data->current                 = 0;
         }
-      } else {
-        ink_assert(!"out of room in hostdb data area");
-        Warning("out of room in hostdb for round-robin DNS data");
-        r->round_robin     = 0;
-        r->round_robin_elt = 0;
       }
     }
+
     if (!failed && !is_rr && !is_srv())
-      restore_info(r, old_r, old_info, old_rr_data);
+      restore_info(r, old_r.get(), old_info, old_rr_data);
     ink_assert(!r || !r->round_robin || !r->reverse_dns);
     ink_assert(failed || !r->round_robin || r->app.rr.offset);
 
+    hostDB.refcountcache->put(md5.hash.fold(), r, allocSize, r->expiry_time());
+
     // if we are not the owner, put on the owner
     //
     ClusterMachine *m = cluster_machine_at_depth(master_hash(md5.hash));
+
     if (m)
       do_put_response(m, r, NULL);
 
@@ -1736,7 +1530,7 @@ HostDBContinuation::do_get_response(Event * /* e ATS_UNUSED */)
 
   // Setup this continuation, with a timeout
   //
-  remoteHostDBQueue[key_partition()].enqueue(this);
+  hostDB.remoteHostDBQueue[key_partition()].enqueue(this);
   SET_HANDLER((HostDBContHandler)&HostDBContinuation::clusterEvent);
   timeout = mutex->thread_holding->schedule_in(this, HOST_DB_CLUSTER_TIMEOUT);
 
@@ -1818,35 +1612,33 @@ HostDBContinuation::iterateEvent(int event, Event *e)
   }
 
   // let's iterate through another record and then reschedule ourself.
-  if (current_iterate_pos < hostDB.buckets) {
-    // do 100 at a time
-    int end = min(current_iterate_pos + 100, hostDB.buckets);
-    for (; current_iterate_pos < end; ++current_iterate_pos) {
-      ProxyMutex *bucket_mutex = hostDB.lock_for_bucket(current_iterate_pos);
-      MUTEX_TRY_LOCK_FOR(lock_bucket, bucket_mutex, t, this);
-      if (!lock_bucket.is_locked()) {
-        // we couldn't get the bucket lock, let's just reschedule and try later.
-        Debug("hostdb", "iterateEvent event=%d eventp=%p: reschedule due to not getting bucket mutex", event, e);
-        mutex->thread_holding->schedule_in(this, HOST_DB_RETRY_PERIOD);
-        return EVENT_CONT;
-      }
+  if (current_iterate_pos < hostDB.refcountcache->partition_count()) {
+    // TODO: configurable number at a time?
+    ProxyMutex *bucket_mutex = hostDB.refcountcache->get_partition(current_iterate_pos).lock.get();
+    MUTEX_TRY_LOCK_FOR(lock_bucket, bucket_mutex, t, this);
+    if (!lock_bucket.is_locked()) {
+      // we couldn't get the bucket lock, let's just reschedule and try later.
+      Debug("hostdb", "iterateEvent event=%d eventp=%p: reschedule due to not getting bucket mutex", event, e);
+      mutex->thread_holding->schedule_in(this, HOST_DB_RETRY_PERIOD);
+      return EVENT_CONT;
+    }
 
-      for (unsigned int l = 0; l < hostDB.levels; ++l) {
-        HostDBInfo *r =
-          reinterpret_cast<HostDBInfo *>(hostDB.data + hostDB.level_offset[l] + hostDB.bucketsize[l] * current_iterate_pos);
-        if (!r->deleted && !r->failed()) {
-          action.continuation->handleEvent(EVENT_INTERVAL, static_cast<void *>(r));
-        }
+    TSHashTable<RefCountCacheHashing> *partMap = hostDB.refcountcache->get_partition(current_iterate_pos).get_map();
+    for (RefCountCachePartition<HostDBInfo>::iterator_type i = partMap->begin(); i != partMap->end(); ++i) {
+      HostDBInfo *r = (HostDBInfo *)i.m_value->item.get();
+      if (r && !r->is_failed()) {
+        action.continuation->handleEvent(EVENT_INTERVAL, static_cast<void *>(r));
       }
     }
 
+    current_iterate_pos++;
     // And reschedule ourselves to pickup the next bucket after HOST_DB_RETRY_PERIOD.
-    Debug("hostdb", "iterateEvent event=%d eventp=%p: completed current iteration %d of %d", event, e, current_iterate_pos,
-          hostDB.buckets);
+    Debug("hostdb", "iterateEvent event=%d eventp=%p: completed current iteration %ld of %ld", event, e, current_iterate_pos,
+          hostDB.refcountcache->partition_count());
     mutex->thread_holding->schedule_in(this, HOST_DB_ITERATE_PERIOD);
     return EVENT_CONT;
   } else {
-    Debug("hostdb", "iterateEvent event=%d eventp=%p: completed FINAL iteration %d", event, e, current_iterate_pos);
+    Debug("hostdb", "iterateEvent event=%d eventp=%p: completed FINAL iteration %ld", event, e, current_iterate_pos);
     // if there are no more buckets, then we're done.
     action.continuation->handleEvent(EVENT_DONE, NULL);
     hostdb_cont_free(this);
@@ -1904,18 +1696,18 @@ HostDBContinuation::probeEvent(int /* event ATS_UNUSED */, Event *e)
   if (!force_dns) {
     // Do the probe
     //
-    HostDBInfo *r = probe(mutex.get(), md5, false);
+    Ptr<HostDBInfo> r = probe(mutex.get(), md5, false);
 
     if (r)
       HOSTDB_INCREMENT_DYN_STAT(hostdb_total_hits_stat);
 
     if (action.continuation && r)
-      reply_to_cont(action.continuation, r);
+      reply_to_cont(action.continuation, r.get());
 
     // Respond to any remote node
     //
     if (from)
-      do_put_response(from, r, from_cont);
+      do_put_response(from, r.get(), from_cont);
 
     // If it suceeds or it was a remote probe, we are done
     //
@@ -1984,6 +1776,7 @@ HostDBContinuation::do_dns()
       // check 127.0.0.1 format // What the heck does that mean? - AMC
       if (action.continuation) {
         HostDBInfo *r = lookup_done(tip, md5.host_name, false, HOST_DB_MAX_TTL, NULL);
+
         reply_to_cont(action.continuation, r);
       }
       hostdb_cont_free(this);
@@ -2038,7 +1831,7 @@ HostDBContinuation::clusterResponseEvent(int /*  event ATS_UNUSED */, Event *e)
 {
   if (from_cont) {
     HostDBContinuation *c;
-    for (c = (HostDBContinuation *)remoteHostDBQueue[key_partition()].head; c; c = (HostDBContinuation *)c->link.next)
+    for (c = (HostDBContinuation *)hostDB.remoteHostDBQueue[key_partition()].head; c; c = (HostDBContinuation *)c->link.next)
       if (c == from_cont)
         break;
 
@@ -2074,7 +1867,7 @@ HostDBContinuation::clusterEvent(int event, Event *e)
 {
   // remove ourselves from the queue
   //
-  remoteHostDBQueue[key_partition()].remove(this);
+  hostDB.remoteHostDBQueue[key_partition()].remove(this);
 
   switch (event) {
   default:
@@ -2185,7 +1978,7 @@ get_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUS
 
   copt.host_res_style = host_res_style_for(&msg->ip.sa);
   c->init(md5, copt);
-  c->mutex        = hostDB.lock_for_bucket(fold_md5(msg->md5) % hostDB.buckets);
+  c->mutex        = hostDB.refcountcache->lock_for_key(msg->md5.fold());
   c->action.mutex = c->mutex;
   dnsProcessor.thread->schedule_imm(c);
 }
@@ -2207,7 +2000,7 @@ put_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUS
   md5.db_mark         = db_mark_for(&msg->ip.sa);
   copt.host_res_style = host_res_style_for(&msg->ip.sa);
   c->init(md5, copt);
-  c->mutex       = hostDB.lock_for_bucket(fold_md5(msg->md5) % hostDB.buckets);
+  c->mutex       = hostDB.refcountcache->lock_for_key(msg->md5.fold());
   c->from_cont   = msg->cont; // cannot use action if cont freed due to timeout
   c->missing     = msg->missing;
   c->round_robin = msg->round_robin;
@@ -2269,48 +2062,25 @@ HostDBContinuation::backgroundEvent(int /* event ATS_UNUSED */, Event * /* e ATS
   return EVENT_CONT;
 }
 
-bool
-HostDBInfo::match(INK_MD5 &md5, int /* bucket ATS_UNUSED */, int buckets)
-{
-  if (md5[1] != md5_high)
-    return false;
-
-  uint64_t folded_md5 = fold_md5(md5);
-  uint64_t ttag       = folded_md5 / buckets;
-
-  if (!ttag)
-    ttag = 1;
-
-  struct {
-    unsigned int md5_low_low : 24;
-    unsigned int md5_low;
-  } tmp;
-
-  tmp.md5_low_low = (unsigned int)ttag;
-  tmp.md5_low     = (unsigned int)(ttag >> 24);
-
-  return tmp.md5_low_low == md5_low_low && tmp.md5_low == md5_low;
-}
-
 char *
-HostDBInfo::hostname()
+HostDBInfo::hostname() const
 {
   if (!reverse_dns)
     return NULL;
 
-  return (char *)hostDB.ptr(&data.hostname_offset, hostDB.ptr_to_partition((char *)this));
+  return (char *)this + data.hostname_offset;
 }
 
 /*
  * The perm_hostname exists for all records not just reverse dns records.
  */
 char *
-HostDBInfo::perm_hostname()
+HostDBInfo::perm_hostname() const
 {
   if (hostname_offset == 0)
     return NULL;
 
-  return (char *)hostDB.ptr(&hostname_offset, hostDB.ptr_to_partition((char *)this));
+  return (char *)this + hostname_offset;
 }
 
 HostDBRoundRobin *
@@ -2319,43 +2089,7 @@ HostDBInfo::rr()
   if (!round_robin)
     return NULL;
 
-  HostDBRoundRobin *r = (HostDBRoundRobin *)hostDB.ptr(&app.rr.offset, hostDB.ptr_to_partition((char *)this));
-
-  if (r &&
-      (r->rrcount > HOST_DB_MAX_ROUND_ROBIN_INFO || r->rrcount <= 0 || r->good > HOST_DB_MAX_ROUND_ROBIN_INFO || r->good <= 0)) {
-    ink_assert(!"bad round-robin");
-    return NULL;
-  }
-  return r;
-}
-
-int
-HostDBInfo::heap_size()
-{
-  if (reverse_dns) {
-    char *h = hostname();
-
-    if (h)
-      return strlen(h) + 1;
-  } else if (round_robin) {
-    HostDBRoundRobin *r = rr();
-
-    if (r)
-      return r->length;
-  }
-  return 0;
-}
-
-int *
-HostDBInfo::heap_offset_ptr()
-{
-  if (reverse_dns)
-    return &data.hostname_offset;
-
-  if (round_robin)
-    return &app.rr.offset;
-
-  return NULL;
+  return (HostDBRoundRobin *)((char *)this + this->app.rr.offset);
 }
 
 ClusterMachine *
@@ -2448,7 +2182,7 @@ struct ShowHostDB : public ShowCont {
           }
 
           for (int i = 0; i < rr_data->rrcount; i++) {
-            showOne(&rr_data->info[i], true, event, e, rr_data);
+            showOne(&rr_data->info(i), true, event, e, rr_data);
             if (output_json) {
               CHECK_SHOW(show("}")); // we need to seperate records
               if (i < (rr_data->rrcount - 1))
@@ -2497,8 +2231,6 @@ struct ShowHostDB : public ShowCont {
       }
 
       // Let's display the MD5.
-      CHECK_SHOW(show("<tr><td>%s</td><td>%0.16llx %0.8x %0.8x</td></tr>\n", "MD5 (high, low, low low)", r->md5_high, r->md5_low,
-                      r->md5_low_low));
       CHECK_SHOW(show("<tr><td>%s</td><td>%u</td></tr>\n", "App1", r->app.allotment.application1));
       CHECK_SHOW(show("<tr><td>%s</td><td>%u</td></tr>\n", "App2", r->app.allotment.application2));
       CHECK_SHOW(show("<tr><td>%s</td><td>%u</td></tr>\n", "LastFailure", r->app.http_data.last_failure));
@@ -2546,8 +2278,6 @@ struct ShowHostDB : public ShowCont {
       } else if (!r->is_srv) {
         CHECK_SHOW(show("\"%s\":\"%s\"", "ip", ats_ip_ntop(r->ip(), b, sizeof b)));
       }
-      // Let's display the MD5.
-      CHECK_SHOW(show("\"%s\":\"%0.16llx %0.8x %0.8x\"", "md5", r->md5_high, r->md5_low, r->md5_low_low));
     }
     return EVENT_CONT;
   }
@@ -2575,7 +2305,7 @@ struct ShowHostDB : public ShowCont {
           CHECK_SHOW(show("</table>\n"));
 
           for (int i = 0; i < rr_data->rrcount; i++)
-            showOne(&rr_data->info[i], true, event, e, rr_data);
+            showOne(&rr_data->info(i), true, event, e, rr_data);
         }
       }
     } else {
@@ -2656,6 +2386,10 @@ register_ShowHostDB(Continuation *c, HTTPHdr *h)
 struct HostDBTestReverse;
 typedef int (HostDBTestReverse::*HostDBTestReverseHandler)(int, void *);
 struct HostDBTestReverse : public Continuation {
+  RegressionTest *test;
+  int type;
+  int *status;
+
   int outstanding;
   int total;
 #if HAVE_LRAND48_R
@@ -2667,8 +2401,9 @@ struct HostDBTestReverse : public Continuation {
   {
     if (event == EVENT_HOST_DB_LOOKUP) {
       HostDBInfo *i = (HostDBInfo *)e;
-      if (i)
-        printf("HostDBTestReverse: reversed %s\n", i->hostname());
+      if (i) {
+        rprintf(test, "HostDBTestReverse: reversed %s\n", i->hostname());
+      }
       outstanding--;
     }
     while (outstanding < HOSTDB_TEST_MAX_OUTSTANDING && total < HOSTDB_TEST_LENGTH) {
@@ -2682,17 +2417,20 @@ struct HostDBTestReverse : public Continuation {
       ip.sin.sin_addr.s_addr = static_cast<in_addr_t>(l);
       outstanding++;
       total++;
-      if (!(outstanding % 1000))
-        printf("HostDBTestReverse: %d\n", total);
+      if (!(outstanding % 1000)) {
+        rprintf(test, "HostDBTestReverse: %d\n", total);
+      }
       hostDBProcessor.getbyaddr_re(this, &ip.sa);
     }
     if (!outstanding) {
-      printf("HostDBTestReverse: done\n");
+      rprintf(test, "HostDBTestReverse: done\n");
+      *status = REGRESSION_TEST_PASSED; //  TODO: actually verify it passed
       delete this;
     }
     return EVENT_CONT;
   }
-  HostDBTestReverse() : Continuation(new_ProxyMutex()), outstanding(0), total(0)
+  HostDBTestReverse(RegressionTest *t, int atype, int *astatus)
+    : Continuation(new_ProxyMutex()), test(t), type(atype), status(astatus), outstanding(0), total(0)
   {
     SET_HANDLER((HostDBTestReverseHandler)&HostDBTestReverse::mainEvent);
 #if HAVE_SRAND48_R
@@ -2704,14 +2442,9 @@ struct HostDBTestReverse : public Continuation {
 };
 
 #if TS_HAS_TESTS
-void
-run_HostDBTest()
+REGRESSION_TEST(HostDBTests)(RegressionTest *t, int atype, int *pstatus)
 {
-  if (is_action_tag_set("hostdb_test_rr"))
-    eventProcessor.schedule_every(new HostDBTestRR, HRTIME_SECONDS(1), ET_NET);
-  if (is_action_tag_set("hostdb_test_reverse")) {
-    eventProcessor.schedule_imm(new HostDBTestReverse, ET_CACHE);
-  }
+  eventProcessor.schedule_imm(new HostDBTestReverse(t, atype, pstatus), ET_CACHE);
 }
 #endif
 
@@ -2735,9 +2468,6 @@ ink_hostdb_init(ModuleVersion v)
   // Register stats
   //
 
-  RecRegisterRawStat(hostdb_rsb, RECT_PROCESS, "proxy.process.hostdb.total_entries", RECD_INT, RECP_PERSISTENT,
-                     (int)hostdb_total_entries_stat, RecRawStatSyncCount);
-
   RecRegisterRawStat(hostdb_rsb, RECT_PROCESS, "proxy.process.hostdb.total_lookups", RECD_INT, RECP_PERSISTENT,
                      (int)hostdb_total_lookups_stat, RecRawStatSyncSum);
 
@@ -2753,9 +2483,6 @@ ink_hostdb_init(ModuleVersion v)
   RecRegisterRawStat(hostdb_rsb, RECT_PROCESS, "proxy.process.hostdb.re_dns_on_reload", RECD_INT, RECP_PERSISTENT,
                      (int)hostdb_re_dns_on_reload_stat, RecRawStatSyncSum);
 
-  RecRegisterRawStat(hostdb_rsb, RECT_PROCESS, "proxy.process.hostdb.bytes", RECD_INT, RECP_PERSISTENT, (int)hostdb_bytes_stat,
-                     RecRawStatSyncCount);
-
   ts_host_res_global_init();
 }
 
@@ -2882,3 +2609,100 @@ ParseHostFile(char const *path, unsigned int hostdb_hostfile_check_interval)
   // Mark this one as completed, so we can allow another update to happen
   HostDBFileUpdateActive = 0;
 }
+
+//
+// Regression tests
+//
+// Take a started hostDB and fill it up and make sure it doesn't explode
+#ifdef TS_HAS_TESTS
+struct HostDBRegressionContinuation;
+
+struct HostDBRegressionContinuation : public Continuation {
+  int hosts;
+  const char **hostnames;
+  RegressionTest *test;
+  int type;
+  int *status;
+
+  int success;
+  int failure;
+  int outstanding;
+  int i;
+
+  int
+  mainEvent(int event, HostDBInfo *r)
+  {
+    (void)event;
+
+    if (event == EVENT_INTERVAL) {
+      rprintf(test, "hosts=%d success=%d failure=%d outstanding=%d i=%d\n", hosts, success, failure, outstanding, i);
+    }
+    if (event == EVENT_HOST_DB_LOOKUP) {
+      --outstanding;
+      // since this is a lookup done, data is either hostdbInfo or NULL
+      if (r) {
+        rprintf(test, "hostdbinfo r=%x\n", r);
+        rprintf(test, "hostdbinfo hostname=%s\n", r->perm_hostname());
+        rprintf(test, "hostdbinfo rr %x\n", r->rr());
+        // If RR, print all of the enclosed records
+        if (r->rr()) {
+          rprintf(test, "hostdbinfo good=%d\n", r->rr()->good);
+          for (int x = 0; x < r->rr()->good; x++) {
+            ip_port_text_buffer ip_buf;
+            ats_ip_ntop(r->rr()->info(x).ip(), ip_buf, sizeof(ip_buf));
+            rprintf(test, "hostdbinfo RR%d ip=%s\n", x, ip_buf);
+          }
+        } else { // Otherwise, just the one will do
+          ip_port_text_buffer ip_buf;
+          ats_ip_ntop(r->ip(), ip_buf, sizeof(ip_buf));
+          rprintf(test, "hostdbinfo A ip=%s\n", ip_buf);
+        }
+        ++success;
+      } else {
+        ++failure;
+      }
+    }
+
+    if (i < hosts) {
+      hostDBProcessor.getbyname_re(this, hostnames[i++], 0);
+      return EVENT_CONT;
+    } else {
+      rprintf(test, "HostDBTestRR: %d outstanding %d succcess %d failure\n", outstanding, success, failure);
+      if (success == hosts) {
+        *status = REGRESSION_TEST_PASSED;
+      } else {
+        *status = REGRESSION_TEST_FAILED;
+      }
+      return EVENT_DONE;
+    }
+  }
+
+  HostDBRegressionContinuation(int ahosts, const char **ahostnames, RegressionTest *t, int atype, int *astatus)
+    : Continuation(new_ProxyMutex()),
+      hosts(ahosts),
+      hostnames(ahostnames),
+      test(t),
+      type(atype),
+      status(astatus),
+      success(0),
+      failure(0),
+      i(0)
+  {
+    outstanding = ahosts;
+    SET_HANDLER(&HostDBRegressionContinuation::mainEvent);
+  }
+};
+
+static const char *dns_test_hosts[] = {
+  "www.apple.com", "www.ibm.com", "www.microsoft.com",
+  "www.coke.com", // RR record
+  "4.2.2.2",      // An IP-- since we don't expect resolution
+  "127.0.0.1",    // loopback since it has some special handling
+};
+
+REGRESSION_TEST(HostDBProcessor)(RegressionTest *t, int atype, int *pstatus)
+{
+  eventProcessor.schedule_in(new HostDBRegressionContinuation(6, dns_test_hosts, t, atype, pstatus), HRTIME_SECONDS(1));
+}
+
+#endif
diff --git a/iocore/hostdb/I_HostDBProcessor.h b/iocore/hostdb/I_HostDBProcessor.h
index 91956bd..e767c09 100644
--- a/iocore/hostdb/I_HostDBProcessor.h
+++ b/iocore/hostdb/I_HostDBProcessor.h
@@ -31,6 +31,7 @@
 #include "ts/ink_resolver.h"
 #include "I_EventSystem.h"
 #include "SRV.h"
+#include "P_RefCountCache.h"
 
 // Event returned on a lookup
 #define EVENT_HOST_DB_LOOKUP (HOSTDB_EVENT_EVENTS_START + 0)
@@ -41,6 +42,7 @@
 #define EVENT_SRV_IP_REMOVED (SRV_EVENT_EVENTS_START + 1)
 #define EVENT_SRV_GET_RESPONSE (SRV_EVENT_EVENTS_START + 2)
 
+// TODO: make configurable
 #define HOST_DB_MAX_ROUND_ROBIN_INFO 16
 
 #define HOST_DB_SRV_PREFIX "_http._tcp."
@@ -125,7 +127,7 @@ union HostDBApplicationInfo {
   };
 
   struct application_data_rr {
-    int offset;
+    unsigned int offset;
   } rr;
 };
 
@@ -139,10 +141,63 @@ struct SRVInfo {
   unsigned int key;
 };
 
-struct HostDBInfo {
+struct HostDBInfo : public RefCountObj {
   /** Internal IP address data.
       This is at least large enough to hold an IPv6 address.
   */
+
+  int iobuffer_index;
+  static HostDBInfo *
+  alloc(int size = 0)
+  {
+    size += sizeof(HostDBInfo);
+    int iobuffer_index = iobuffer_size_to_index(size);
+    ink_release_assert(iobuffer_index >= 0);
+    void *ptr = ioBufAllocator[iobuffer_index].alloc_void();
+    memset(ptr, 0, size);
+    HostDBInfo *ret     = new (ptr) HostDBInfo();
+    ret->iobuffer_index = iobuffer_index;
+    return ret;
+  }
+
+  void
+  free()
+  {
+    ioBufAllocator[iobuffer_index].free_void((void *)(this));
+  }
+
+  // return a version number-- so we can manage compatibility with the marshal/unmarshal
+  static VersionNumber
+  version()
+  {
+    return VersionNumber(1, 0);
+  }
+
+  static HostDBInfo *
+  unmarshall(char *buf, unsigned int size)
+  {
+    if (size < sizeof(HostDBInfo)) {
+      return NULL;
+    }
+    HostDBInfo *ret = HostDBInfo::alloc(size - sizeof(HostDBInfo));
+    int buf_index   = ret->iobuffer_index;
+    memcpy((void *)ret, buf, size);
+    // Reset the refcount back to 0, this is a bit ugly-- but I'm not sure we want to expose a method
+    // to mess with the refcount, since this is a fairly unique use case
+    ret                 = new (ret) HostDBInfo();
+    ret->iobuffer_index = buf_index;
+    return ret;
+  }
+
+  // return expiry time (in seconds since epoch)
+  ink_time_t
+  expiry_time() const
+  {
+    return ip_timestamp + ip_timeout_interval + hostdb_serve_stale_but_revalidate;
+  }
+
+  uint64_t key;
+
   sockaddr *
   ip()
   {
@@ -154,9 +209,9 @@ struct HostDBInfo {
     return &data.ip.sa;
   }
 
-  char *hostname();
-  char *perm_hostname();
-  char *srvname(HostDBRoundRobin *rr);
+  char *hostname() const;
+  char *perm_hostname() const;
+  char *srvname(HostDBRoundRobin *rr) const;
   /// Check if this entry is an element of a round robin entry.
   /// If @c true then this entry is part of and was obtained from a round robin root. This is useful if the
   /// address doesn't work - a retry can probably get a new address by doing another lookup and resolving to
@@ -168,13 +223,6 @@ struct HostDBInfo {
   }
   HostDBRoundRobin *rr();
 
-  /** Indicate that the HostDBInfo is BAD and should be deleted. */
-  void
-  bad()
-  {
-    full = 0;
-  }
-
   /**
     Application specific data. NOTE: We need an integral number of these
     per block. This structure is 32 bytes. (at 200k hosts = 8 Meg). Which
@@ -184,31 +232,31 @@ struct HostDBInfo {
   HostDBApplicationInfo app;
 
   unsigned int
-  ip_interval()
+  ip_interval() const
   {
     return (hostdb_current_interval - ip_timestamp) & 0x7FFFFFFF;
   }
 
   int
-  ip_time_remaining()
+  ip_time_remaining() const
   {
     return static_cast<int>(ip_timeout_interval) - static_cast<int>(this->ip_interval());
   }
 
   bool
-  is_ip_stale()
+  is_ip_stale() const
   {
     return ip_timeout_interval >= 2 * hostdb_ip_stale_interval && ip_interval() >= hostdb_ip_stale_interval;
   }
 
   bool
-  is_ip_timeout()
+  is_ip_timeout() const
   {
     return ip_timeout_interval && ip_interval() >= ip_timeout_interval;
   }
 
   bool
-  is_ip_fail_timeout()
+  is_ip_fail_timeout() const
   {
     return ip_interval() >= hostdb_ip_fail_timeout_interval;
   }
@@ -220,7 +268,7 @@ struct HostDBInfo {
   }
 
   bool
-  serve_stale_but_revalidate()
+  serve_stale_but_revalidate() const
   {
     // the option is disabled
     if (hostdb_serve_stale_but_revalidate <= 0)
@@ -243,40 +291,29 @@ struct HostDBInfo {
   //
 
   union {
-    IpEndpoint ip;       ///< IP address / port data.
-    int hostname_offset; ///< Some hostname thing.
+    IpEndpoint ip;                ///< IP address / port data.
+    unsigned int hostname_offset; ///< Some hostname thing.
     SRVInfo srv;
   } data;
 
-  int hostname_offset; // always maintain a permanent copy of the hostname for non-rev dns records.
+  unsigned int hostname_offset; // always maintain a permanent copy of the hostname for non-rev dns records.
 
   unsigned int ip_timestamp;
   // limited to HOST_DB_MAX_TTL (0x1FFFFF, 24 days)
   // if this is 0 then no timeout.
   unsigned int ip_timeout_interval;
 
-  // Make sure we only have 8 bits of these flags before the @a md5_low_low
-  unsigned int full : 1;
-  unsigned int backed : 1; // duplicated in lower level
-  unsigned int deleted : 1;
-  unsigned int hits : 3;
-
   unsigned int is_srv : 1;
   unsigned int reverse_dns : 1;
 
-  unsigned int md5_low_low : 24;
-  unsigned int md5_low;
-
   unsigned int round_robin : 1;     // This is the root of a round robin block
   unsigned int round_robin_elt : 1; // This is an address in a round robin block
 
-  uint64_t md5_high;
-
   /*
    * Given the current time `now` and the fail_window, determine if this real is alive
    */
   bool
-  alive(ink_time_t now, int32_t fail_window)
+  is_alive(ink_time_t now, int32_t fail_window)
   {
     unsigned int last_failure = app.http_data.last_failure;
 
@@ -295,11 +332,13 @@ struct HostDBInfo {
       return false;
     }
   }
+
   bool
-  failed()
+  is_failed() const
   {
     return !((is_srv && data.srv.srv_offset) || (reverse_dns && data.hostname_offset) || ats_is_ip(ip()));
   }
+
   void
   set_failed()
   {
@@ -310,69 +349,6 @@ struct HostDBInfo {
     else
       ats_ip_invalidate(ip());
   }
-
-  void
-  set_deleted()
-  {
-    deleted = 1;
-  }
-  bool
-  is_deleted() const
-  {
-    return deleted;
-  }
-
-  bool
-  is_empty() const
-  {
-    return !full;
-  }
-
-  void
-  set_empty()
-  {
-    full        = 0;
-    md5_high    = 0;
-    md5_low     = 0;
-    md5_low_low = 0;
-  }
-
-  void
-  set_full(uint64_t folded_md5, int buckets)
-  {
-    uint64_t ttag = folded_md5 / buckets;
-
-    if (!ttag)
-      ttag      = 1;
-    md5_low_low = (unsigned int)ttag;
-    md5_low     = (unsigned int)(ttag >> 24);
-    full        = 1;
-  }
-
-  void
-  reset()
-  {
-    ats_ip_invalidate(ip());
-    app.allotment.application1 = 0;
-    app.allotment.application2 = 0;
-    backed                     = 0;
-    deleted                    = 0;
-    hits                       = 0;
-    round_robin                = 0;
-    reverse_dns                = 0;
-    is_srv                     = 0;
-  }
-
-  uint64_t
-  tag()
-  {
-    uint64_t f = md5_low;
-    return (f << 24) + md5_low_low;
-  }
-
-  bool match(INK_MD5 &, int, int);
-  int heap_size();
-  int *heap_offset_ptr();
 };
 
 struct HostDBRoundRobin {
@@ -383,10 +359,16 @@ struct HostDBRoundRobin {
   short good;
 
   unsigned short current;
-  unsigned short length;
   ink_time_t timed_rr_ctime;
 
-  HostDBInfo info[];
+  // This is the equivalent of a variable length array, we can't use a VLA because
+  // HostDBInfo is a non-POD type-- so this is the best we can do.
+  HostDBInfo &
+  info(short n)
+  {
+    ink_assert(n < rrcount && n >= 0);
+    return *((HostDBInfo *)((char *)this + sizeof(HostDBRoundRobin)) + n);
+  }
 
   // Return the allocation size of a HostDBRoundRobin struct suitable for storing
   // "count" HostDBInfo records.
@@ -411,7 +393,7 @@ struct HostDBRoundRobin {
   HostDBInfo *select_next(sockaddr const *addr);
   HostDBInfo *select_best_http(sockaddr const *client_ip, ink_time_t now, int32_t fail_window);
   HostDBInfo *select_best_srv(char *target, InkRand *rand, ink_time_t now, int32_t fail_window);
-  HostDBRoundRobin() : rrcount(0), good(0), current(0), length(0), timed_rr_ctime(0) {}
+  HostDBRoundRobin() : rrcount(0), good(0), current(0), timed_rr_ctime(0) {}
 };
 
 struct HostDBCache;
@@ -425,7 +407,7 @@ Action *iterate(Continuation *cont);
 
 /** The Host Databse access interface. */
 struct HostDBProcessor : public Processor {
-  friend struct HostDBSyncer;
+  friend struct HostDBSync;
   // Public Interface
 
   // Lookup Hostinfo by name
@@ -485,21 +467,6 @@ struct HostDBProcessor : public Processor {
     return getby(cont, NULL, 0, aip, false, HOST_RES_NONE, 0);
   }
 
-#if 0
-  /**
-    If you were unable to connect to an IP address associated with a
-    particular hostname, call this function and that IP address will
-    be marked "bad" and if the host is using round-robin DNS, next time
-    you will get a different IP address.
-
-  */
-  Action *failed_connect_on_ip_for_name(
-    Continuation * cont,
-    sockaddr const* aip,
-    const char *hostname, int len = 0
-  );
-#endif
-
   /** Set the application information (fire-and-forget). */
   void
   setbyname_appinfo(char *hostname, int len, int port, HostDBApplicationInfo *app)
diff --git a/iocore/hostdb/Makefile.am b/iocore/hostdb/Makefile.am
index cf8956d..47c6eff 100644
--- a/iocore/hostdb/Makefile.am
+++ b/iocore/hostdb/Makefile.am
@@ -31,15 +31,57 @@ EXTRA_DIST = I_HostDB.h
 noinst_LIBRARIES = libinkhostdb.a
 
 libinkhostdb_a_SOURCES = \
+  I_SplitDNS.h \
+  I_SplitDNSProcessor.h \
   HostDB.cc \
   I_HostDB.h \
   I_HostDBProcessor.h \
   Inline.cc \
-  MultiCache.cc \
+  RefCountCache.cc \
   P_HostDB.h \
   P_HostDBProcessor.h \
-  P_MultiCache.h
+  P_RefCountCache.h
+
+TESTS = $(check_PROGRAMS)
+check_PROGRAMS = test_RefCountCache
+
+test_RefCountCache_SOURCES = \
+  test_RefCountCache.cc
+
 
 #test_UNUSED_SOURCES = \
 #  test_I_HostDB.cc \
 #  test_P_HostDB.cc
+
+
+test_LD_FLAGS = \
+  @EXTRA_CXX_LDFLAGS@ \
+  @LIBTOOL_LINK_FLAGS@ \
+  @OPENSSL_LDFLAGS@
+
+test_CPP_FLAGS = \
+  $(AM_CPPFLAGS) \
+  $(iocore_include_dirs) \
+  -I$(top_srcdir)/proxy/api/ts \
+  -I$(top_srcdir)/proxy/api \
+  -I$(top_srcdir)/proxy \
+  -I$(top_srcdir)/proxy/hdrs \
+  -I$(top_srcdir)/proxy/http \
+  -I$(top_srcdir)/proxy/logging \
+  -I$(top_srcdir)/mgmt \
+  -I$(top_srcdir)/mgmt/utils \
+  @OPENSSL_INCLUDES@
+
+test_LD_ADD = \
+  $(top_builddir)/lib/records/librecords_p.a \
+  $(top_builddir)/mgmt/libmgmt_p.la \
+  $(top_builddir)/iocore/eventsystem/libinkevent.a \
+  $(top_builddir)/lib/ts/libtsutil.la \
+  $(top_builddir)/proxy/shared/libUglyLogStubs.a \
+  @LIBTCL@ @HWLOC_LIBS@
+
+test_RefCountCache_CPPFLAGS = $(test_CPP_FLAGS)
+
+test_RefCountCache_LDFLAGS = $(test_LD_FLAGS)
+
+test_RefCountCache_LDADD = $(test_LD_ADD)
diff --git a/iocore/hostdb/MultiCache.cc b/iocore/hostdb/MultiCache.cc
deleted file mode 100644
index 78e0aba..0000000
--- a/iocore/hostdb/MultiCache.cc
+++ /dev/null
@@ -1,1413 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  MultiCache.cc
-
-
- ****************************************************************************/
-
-#include "ts/ink_platform.h"
-#include "ts/I_Layout.h"
-#include "P_HostDB.h"
-#include "P_MultiCache.h"
-#include "P_EventSystem.h" // FIXME: need to have this in I_* header files.
-#include "ts/ink_file.h"
-
-static const int MC_SYNC_MIN_PAUSE_TIME = HRTIME_MSECONDS(200); // Pause for at least 200ms
-
-MultiCacheBase::MultiCacheBase()
-  : store(0), mapped_header(NULL), data(0), lowest_level_data(0), miss_stat(0), buckets_per_partitionF8(0)
-{
-  filename[0] = 0;
-  memset(hit_stat, 0, sizeof(hit_stat));
-  memset(unsunk, 0, sizeof(unsunk));
-  for (int i     = 0; i < MULTI_CACHE_PARTITIONS; i++)
-    unsunk[i].mc = this;
-}
-
-inline int
-store_verify(Store *store)
-{
-  if (!store)
-    return 0;
-  for (unsigned i = 0; i < store->n_disks; i++) {
-    for (Span *sd = store->disk[i]; sd; sd = sd->link.next) {
-      if (!sd->file_pathname && sd->offset)
-        return 0;
-    }
-  }
-  return 1;
-}
-
-MultiCacheHeader::MultiCacheHeader()
-  : magic(MULTI_CACHE_MAGIC_NUMBER),
-    levels(0),
-    tag_bits(0),
-    max_hits(0),
-    elementsize(0),
-    buckets(0),
-    totalelements(0),
-    totalsize(0),
-    nominal_elements(0),
-    heap_size(0),
-    heap_halfspace(0)
-{
-  memset(level_offset, 0, sizeof(level_offset));
-  memset(bucketsize, 0, sizeof(bucketsize));
-  memset(elements, 0, sizeof(elements));
-  heap_used[0]      = 8;
-  heap_used[1]      = 8;
-  version.ink_major = MULTI_CACHE_MAJOR_VERSION;
-  version.ink_minor = MULTI_CACHE_MINOR_VERSION;
-}
-
-static inline int
-bytes_to_blocks(int64_t b)
-{
-  return (int)((b + (STORE_BLOCK_SIZE - 1)) / STORE_BLOCK_SIZE);
-}
-
-inline int
-MultiCacheBase::blocks_in_level(unsigned int level)
-{
-  int64_t sumbytes = 0;
-  int prevblocks   = 0;
-  int b            = 0;
-  for (unsigned int i = 0; i <= level; i++) {
-    sumbytes += buckets * ((int64_t)bucketsize[i]);
-    int sumblocks = bytes_to_blocks(sumbytes);
-    b             = sumblocks - prevblocks;
-    prevblocks    = sumblocks;
-  }
-  return b;
-}
-
-//
-// Initialize MultiCache
-// The outermost level of the cache contains ~aelements.
-// The higher levels (lower in number) contain fewer.
-//
-int
-MultiCacheBase::initialize(Store *astore, char *afilename, int aelements, int abuckets, unsigned int alevels,
-                           int level0_elements_per_bucket, int level1_elements_per_bucket, int level2_elements_per_bucket)
-{
-  int64_t size = 0;
-
-  Debug("multicache", "initializing %s with %d elements, %d buckets and %d levels", afilename, aelements, abuckets, alevels);
-  ink_assert(alevels <= MULTI_CACHE_MAX_LEVELS);
-  if (alevels > MULTI_CACHE_MAX_LEVELS) {
-    Warning("Alevels too large %d, cannot initialize MultiCache", MULTI_CACHE_MAX_LEVELS);
-    return -1;
-  }
-  levels           = alevels;
-  elementsize      = get_elementsize();
-  totalelements    = 0;
-  nominal_elements = aelements;
-  buckets          = abuckets;
-
-  ink_strlcpy(filename, afilename, sizeof(filename));
-  //
-  //  Allocate level 2 as the outermost
-  //
-  if (levels > 2) {
-    if (!buckets) {
-      buckets = aelements / level2_elements_per_bucket;
-      if (buckets < MULTI_CACHE_PARTITIONS)
-        buckets = MULTI_CACHE_PARTITIONS;
-    }
-    if (levels == 3)
-      level2_elements_per_bucket = aelements / buckets;
-    elements[2]                  = level2_elements_per_bucket;
-    totalelements += buckets * level2_elements_per_bucket;
-    bucketsize[2] = elementsize * level2_elements_per_bucket;
-    size += (int64_t)bucketsize[2] * (int64_t)buckets;
-
-    if (!(level2_elements_per_bucket / level1_elements_per_bucket)) {
-      Warning("Size change too large, unable to reconfigure");
-      return -1;
-    }
-
-    aelements /= (level2_elements_per_bucket / level1_elements_per_bucket);
-  }
-  //
-  //  Allocate level 1
-  //
-  if (levels > 1) {
-    if (!buckets) {
-      buckets = aelements / level1_elements_per_bucket;
-      if (buckets < MULTI_CACHE_PARTITIONS)
-        buckets = MULTI_CACHE_PARTITIONS;
-    }
-    if (levels == 2)
-      level1_elements_per_bucket = aelements / buckets;
-    elements[1]                  = level1_elements_per_bucket;
-    totalelements += buckets * level1_elements_per_bucket;
-    bucketsize[1] = elementsize * level1_elements_per_bucket;
-    size += (int64_t)bucketsize[1] * (int64_t)buckets;
-    if (!(level1_elements_per_bucket / level0_elements_per_bucket)) {
-      Warning("Size change too large, unable to reconfigure");
-      return -2;
-    }
-    aelements /= (level1_elements_per_bucket / level0_elements_per_bucket);
-  }
-  //
-  //  Allocate level 0
-  //
-  if (!buckets) {
-    buckets = aelements / level0_elements_per_bucket;
-    if (buckets < MULTI_CACHE_PARTITIONS)
-      buckets = MULTI_CACHE_PARTITIONS;
-  }
-  if (levels == 1)
-    level0_elements_per_bucket = aelements / buckets;
-  elements[0]                  = level0_elements_per_bucket;
-  totalelements += buckets * level0_elements_per_bucket;
-  bucketsize[0] = elementsize * level0_elements_per_bucket;
-  size += (int64_t)bucketsize[0] * (int64_t)buckets;
-
-  buckets_per_partitionF8 = (buckets << 8) / MULTI_CACHE_PARTITIONS;
-  ink_release_assert(buckets_per_partitionF8);
-
-  unsigned int blocks = (size + (STORE_BLOCK_SIZE - 1)) / STORE_BLOCK_SIZE;
-
-  heap_size = int((float)totalelements * estimated_heap_bytes_per_entry());
-  blocks += bytes_to_blocks(heap_size);
-
-  blocks += 1; // header
-  totalsize = (int64_t)blocks * (int64_t)STORE_BLOCK_SIZE;
-
-  Debug("multicache", "heap_size = %d, totalelements = %d, totalsize = %d", heap_size, totalelements, totalsize);
-
-  //
-  //  Spread alloc from the store (using storage that can be mmapped)
-  //
-  delete store;
-  store = new Store;
-  astore->spread_alloc(*store, blocks, true);
-  unsigned int got = store->total_blocks();
-
-  if (got < blocks) {
-    astore->free(*store);
-    delete store;
-    store = NULL;
-    Warning("Configured store too small (actual=%d required=%d), unable to reconfigure", got * STORE_BLOCK_SIZE,
-            blocks * STORE_BLOCK_SIZE);
-    return -3;
-  }
-  totalsize = (STORE_BLOCK_SIZE)*blocks;
-
-  level_offset[1] = buckets * bucketsize[0];
-  level_offset[2] = buckets * bucketsize[1] + level_offset[1];
-
-  if (lowest_level_data)
-    delete[] lowest_level_data;
-  lowest_level_data = new char[lowest_level_data_size()];
-  ink_assert(lowest_level_data);
-  memset(lowest_level_data, 0xFF, lowest_level_data_size());
-
-  return got;
-}
-
-char *
-MultiCacheBase::mmap_region(int blocks, int *fds, char *cur, size_t &total_length, bool private_flag, int zero_fill)
-{
-  if (!blocks)
-    return cur;
-  int p     = 0;
-  char *res = 0;
-  for (unsigned i = 0; i < store->n_disks; i++) {
-    unsigned int target    = blocks / (store->n_disks - i);
-    unsigned int following = store->total_blocks(i + 1);
-    if (blocks - target > following)
-      target = blocks - following;
-    Span *ds = store->disk[i];
-    for (unsigned j = 0; j < store->disk[i]->paths(); j++) {
-      Span *d = ds->nth(j);
-
-      ink_assert(d->is_mmapable());
-
-      if (target && d->blocks) {
-        int b = d->blocks;
-        if (d->blocks > target)
-          b = target;
-        d->blocks -= b;
-        unsigned int nbytes = b * STORE_BLOCK_SIZE;
-        int fd              = fds[p] ? fds[p] : zero_fill;
-        ink_assert(-1 != fd);
-        int flags = private_flag ? MAP_PRIVATE : MAP_SHARED_MAP_NORESERVE;
-
-        if (cur)
-          res = (char *)mmap(cur, nbytes, PROT_READ | PROT_WRITE, MAP_FIXED | flags, fd, d->offset * STORE_BLOCK_SIZE);
-        else
-          res = (char *)mmap(cur, nbytes, PROT_READ | PROT_WRITE, flags, fd, d->offset * STORE_BLOCK_SIZE);
-
-        d->offset += b;
-
-        if (res == NULL || res == (caddr_t)MAP_FAILED)
-          return NULL;
-        ink_assert(!cur || res == cur);
-        cur = res + nbytes;
-        blocks -= b;
-        total_length += nbytes; // total amount mapped.
-      }
-      p++;
-    }
-  }
-  return blocks ? 0 : cur;
-}
-
-void
-MultiCacheBase::reset()
-{
-  if (store)
-    delete store;
-  store = 0;
-  if (lowest_level_data)
-    delete[] lowest_level_data;
-  lowest_level_data = 0;
-  if (data)
-    unmap_data();
-  data = 0;
-}
-
-int
-MultiCacheBase::unmap_data()
-{
-  int res = 0;
-  if (data) {
-    res  = munmap(data, totalsize);
-    data = NULL;
-    return res;
-  }
-  return 0;
-}
-
-int
-MultiCacheBase::mmap_data(bool private_flag, bool zero_fill)
-{
-  ats_scoped_fd fd;
-  int fds[MULTI_CACHE_MAX_FILES] = {0};
-  int n_fds                      = 0;
-  size_t total_mapped            = 0; // total mapped memory from storage.
-
-  // open files
-  //
-  if (!store || !store->n_disks)
-    goto Lalloc;
-  for (unsigned i = 0; i < store->n_disks; i++) {
-    Span *ds = store->disk[i];
-    for (unsigned j = 0; j < store->disk[i]->paths(); j++) {
-      char path[PATH_NAME_MAX];
-      Span *d = ds->nth(j);
-      int r   = d->path(filename, NULL, path, PATH_NAME_MAX);
-      if (r < 0) {
-        Warning("filename too large '%s'", filename);
-        goto Labort;
-      }
-      fds[n_fds] = socketManager.open(path, O_RDWR | O_CREAT, 0644);
-      if (fds[n_fds] < 0) {
-        if (!zero_fill) {
-          Warning("unable to open file '%s': %d, %s", path, errno, strerror(errno));
-          goto Lalloc;
-        }
-        fds[n_fds] = 0;
-      }
-      if (!d->file_pathname) {
-        struct stat fd_stat;
-
-        if (fstat(fds[n_fds], &fd_stat) < 0) {
-          Warning("unable to stat file '%s'", path);
-          goto Lalloc;
-        } else {
-          int64_t size = (off_t)(d->blocks * STORE_BLOCK_SIZE);
-
-          if (fd_stat.st_size != size) {
-            int err = ink_file_fd_zerofill(fds[n_fds], size);
-
-            if (err != 0) {
-              Warning("unable to set file '%s' size to %" PRId64 ": %d, %s", path, size, err, strerror(err));
-              goto Lalloc;
-            }
-          }
-        }
-      }
-      n_fds++;
-    }
-  }
-
-  data = 0;
-
-  // mmap levels
-  //
-  {
-    // make a copy of the store
-    Store tStore;
-    store->dup(tStore);
-    Store *saved = store;
-    store        = &tStore;
-
-    char *cur = 0;
-
-// find a good address to start
-#if !defined(darwin)
-    fd = socketManager.open("/dev/zero", O_RDONLY, 0645);
-    if (fd < 0) {
-      store = saved;
-      Warning("unable to open /dev/zero: %d, %s", errno, strerror(errno));
-      goto Labort;
-    }
-#endif
-
-// lots of useless stuff
-#if defined(darwin)
-    cur = (char *)mmap(0, totalsize, PROT_READ, MAP_SHARED_MAP_NORESERVE | MAP_ANON, -1, 0);
-#else
-    cur = (char *)mmap(0, totalsize, PROT_READ, MAP_SHARED_MAP_NORESERVE, fd, 0);
-#endif
-    if (cur == NULL || cur == (caddr_t)MAP_FAILED) {
-      store = saved;
-#if defined(darwin)
-      Warning("unable to mmap anonymous region for %u bytes: %d, %s", totalsize, errno, strerror(errno));
-#else
-      Warning("unable to mmap /dev/zero for %u bytes: %d, %s", totalsize, errno, strerror(errno));
-#endif
-      goto Labort;
-    }
-    if (munmap(cur, totalsize)) {
-      store = saved;
-#if defined(darwin)
-      Warning("unable to munmap anonymous region for %u bytes: %d, %s", totalsize, errno, strerror(errno));
-#else
-      Warning("unable to munmap /dev/zero for %u bytes: %d, %s", totalsize, errno, strerror(errno));
-#endif
-      goto Labort;
-    }
-
-    /* We've done a mmap on a target region of the maximize size we need. Now we drop that mapping
-       and do the real one, keeping at the same address space (stored in @a data) which should work because
-       we just tested it.
-    */
-    // coverity[use_after_free]
-    data = cur;
-
-    cur = mmap_region(blocks_in_level(0), fds, cur, total_mapped, private_flag, fd);
-    if (!cur) {
-      store = saved;
-      goto Labort;
-    }
-    if (levels > 1)
-      cur = mmap_region(blocks_in_level(1), fds, cur, total_mapped, private_flag, fd);
-    if (!cur) {
-      store = saved;
-      goto Labort;
-    }
-    if (levels > 2)
-      cur = mmap_region(blocks_in_level(2), fds, cur, total_mapped, private_flag, fd);
-    if (!cur) {
-      store = saved;
-      goto Labort;
-    }
-
-    if (heap_size) {
-      heap = cur;
-      cur  = mmap_region(bytes_to_blocks(heap_size), fds, cur, total_mapped, private_flag, fd);
-      if (!cur) {
-        store = saved;
-        goto Labort;
-      }
-    }
-    mapped_header = (MultiCacheHeader *)cur;
-    if (!mmap_region(1, fds, cur, total_mapped, private_flag, fd)) {
-      store = saved;
-      goto Labort;
-    }
-#if !defined(darwin)
-    ink_assert(!socketManager.close(fd));
-#endif
-    store = saved;
-  }
-
-  for (int i = 0; i < n_fds; i++) {
-    if (fds[i] >= 0)
-      ink_assert(!socketManager.close(fds[i]));
-  }
-
-  return 0;
-Lalloc : {
-  free(data);
-  char *cur = 0;
-
-  data = (char *)ats_memalign(ats_pagesize(), totalsize);
-  cur  = data + STORE_BLOCK_SIZE * blocks_in_level(0);
-  if (levels > 1)
-    cur = data + STORE_BLOCK_SIZE * blocks_in_level(1);
-  if (levels > 2)
-    cur = data + STORE_BLOCK_SIZE * blocks_in_level(2);
-  if (heap_size) {
-    heap = cur;
-    cur += bytes_to_blocks(heap_size) * STORE_BLOCK_SIZE;
-  }
-  mapped_header = (MultiCacheHeader *)cur;
-  for (int i = 0; i < n_fds; i++) {
-    if (fds[i] >= 0)
-      socketManager.close(fds[i]);
-  }
-
-  return 0;
-}
-
-Labort:
-  for (int i = 0; i < n_fds; i++) {
-    if (fds[i] >= 0)
-      socketManager.close(fds[i]);
-  }
-  if (total_mapped > 0)
-    munmap(data, total_mapped);
-
-  return -1;
-}
-
-void
-MultiCacheBase::clear()
-{
-  memset(data, 0, totalsize);
-  heap_used[0]   = 8;
-  heap_used[1]   = 8;
-  heap_halfspace = 0;
-  *mapped_header = *(MultiCacheHeader *)this;
-}
-
-void
-MultiCacheBase::clear_but_heap()
-{
-  memset(data, 0, totalelements * elementsize);
-  *mapped_header = *(MultiCacheHeader *)this;
-}
-
-int
-MultiCacheBase::read_config(const char *config_filename, Store &s, char *fn, int *pi, int *pbuck)
-{
-  int scratch;
-  ats_scoped_str rundir(RecConfigReadRuntimeDir());
-  char p[PATH_NAME_MAX], buf[256];
-
-  Layout::relative_to(p, sizeof(p), rundir, config_filename);
-
-  ats_scoped_fd fd(::open(p, O_RDONLY));
-  if (fd < 0)
-    return 0;
-
-  if (ink_file_fd_readline(fd, sizeof(buf), buf) <= 0)
-    return -1;
-  // coverity[secure_coding]
-  if (sscanf(buf, "%d\n", pi ? pi : &scratch) != 1)
-    return -1;
-
-  if (ink_file_fd_readline(fd, sizeof(buf), buf) <= 0)
-    return -1;
-  // coverity[secure_coding]
-  if (sscanf(buf, "%d\n", pbuck ? pbuck : &scratch) != 1)
-    return -1;
-
-  if (ink_file_fd_readline(fd, sizeof(buf), buf) <= 0)
-    return -1;
-  // coverity[secure_coding]
-  if (sscanf(buf, "%d\n", &heap_size) != 1)
-    return -1;
-
-  if (s.read(fd, fn) < 0)
-    return -1;
-
-  return 1;
-}
-
-int
-MultiCacheBase::write_config(const char *config_filename, int nominal_size, int abuckets)
-{
-  ats_scoped_str rundir(RecConfigReadRuntimeDir());
-  char p[PATH_NAME_MAX], buf[256];
-  int fd, retcode = -1;
-
-  Layout::relative_to(p, sizeof(p), rundir, config_filename);
-
-  if ((fd = ::open(p, O_CREAT | O_WRONLY | O_TRUNC, 0644)) >= 0) {
-    snprintf(buf, sizeof(buf) - 1, "%d\n%d\n%d\n", nominal_size, abuckets, heap_size);
-    buf[sizeof(buf) - 1] = 0;
-    if (ink_file_fd_writestring(fd, buf) != -1 && store->write(fd, filename) >= 0)
-      retcode = 0;
-    ::close(fd);
-  } else
-    Warning("unable to open '%s' for write: %d, %s", p, errno, strerror(errno));
-
-  return retcode;
-}
-
-int
-MultiCacheBase::open(Store *s, const char *config_filename, char *db_filename, int db_size, bool reconfigure, bool fix, bool silent)
-{
-  int ret         = 0;
-  const char *err = NULL;
-  char *serr      = NULL;
-  char t_db_filename[PATH_NAME_MAX];
-  int t_db_size    = 0;
-  int t_db_buckets = 0;
-  int change       = 0;
-
-  t_db_filename[0] = 0;
-
-  // Set up cache
-  {
-    Store tStore;
-    int res = read_config(config_filename, tStore, t_db_filename, &t_db_size, &t_db_buckets);
-
-    ink_assert(store_verify(&tStore));
-    if (res < 0)
-      goto LfailRead;
-    if (!res) {
-      if (!reconfigure || !db_filename || !db_size)
-        goto LfailConfig;
-      if (initialize(s, db_filename, db_size) <= 0)
-        goto LfailInit;
-      write_config(config_filename, db_size, buckets);
-      if (mmap_data() < 0)
-        goto LfailMap;
-      clear();
-    } else {
-      // don't know how to rebuild from this problem
-      ink_assert(!db_filename || !strcmp(t_db_filename, db_filename));
-      if (!db_filename)
-        db_filename = t_db_filename;
-
-      // Has the size changed?
-      change = (db_size >= 0) ? (db_size - t_db_size) : 0;
-      if (db_size < 0)
-        db_size = t_db_size;
-      if (change && !reconfigure)
-        goto LfailConfig;
-
-      Store cStore;
-      tStore.dup(cStore);
-
-      // Try to get back our storage
-      Store diff;
-
-      s->try_realloc(cStore, diff);
-      if (diff.n_disks && !reconfigure)
-        goto LfailConfig;
-
-      // Do we need to do a reconfigure?
-      if (diff.n_disks || change) {
-        // find a new store to old the amount of space we need
-        int delta = change;
-
-        if (diff.n_disks)
-          delta += diff.total_blocks();
-
-        if (delta) {
-          if (delta > 0) {
-            Store freeStore;
-            stealStore(freeStore, delta);
-            Store more;
-            freeStore.spread_alloc(more, delta);
-            if (delta > (int)more.total_blocks())
-              goto LfailReconfig;
-            Store more_diff;
-            s->try_realloc(more, more_diff);
-            if (more_diff.n_disks)
-              goto LfailReconfig;
-            cStore.add(more);
-            if (more.clear(db_filename, false) < 0)
-              goto LfailReconfig;
-          }
-          if (delta < 0) {
-            Store removed;
-            cStore.spread_alloc(removed, -delta);
-          }
-        }
-        cStore.sort();
-        if (initialize(&cStore, db_filename, db_size, t_db_buckets) <= 0)
-          goto LfailInit;
-
-        ink_assert(store_verify(store));
-
-        if (write_config(config_filename, db_size, buckets) < 0)
-          goto LfailWrite;
-
-        ink_assert(store_verify(store));
-
-        //  rebuild
-        MultiCacheBase *old = dup();
-        if (old->initialize(&tStore, t_db_filename, t_db_size, t_db_buckets) <= 0) {
-          delete old;
-          goto LfailInit;
-        }
-
-        if (rebuild(*old)) {
-          delete old;
-          goto LfailRebuild;
-        }
-        ink_assert(store_verify(store));
-        delete old;
-
-      } else {
-        if (initialize(&tStore, db_filename, db_size, t_db_buckets) <= 0)
-          goto LfailFix;
-        ink_assert(store_verify(store));
-        if (mmap_data() < 0)
-          goto LfailMap;
-        if (!verify_header())
-          goto LheaderCorrupt;
-        *(MultiCacheHeader *)this = *mapped_header;
-        ink_assert(store_verify(store));
-
-        if (fix)
-          if (check(config_filename, true) < 0)
-            goto LfailFix;
-      }
-    }
-  }
-
-  if (store)
-    ink_assert(store_verify(store));
-Lcontinue:
-  return ret;
-
-LheaderCorrupt:
-  err = "header missing/corrupt";
-  goto Lfail;
-
-LfailWrite:
-  err  = "unable to write";
-  serr = strerror(errno);
-  goto Lfail;
-
-LfailRead:
-  err  = "unable to read";
-  serr = strerror(errno);
-  goto Lfail;
-
-LfailInit:
-  err = "unable to initialize database (too little storage)\n";
-  goto Lfail;
-
-LfailConfig:
-  err = "configuration changed";
-  goto Lfail;
-
-LfailReconfig:
-  err = "unable to reconfigure";
-  goto Lfail;
-
-LfailRebuild:
-  err = "unable to rebuild";
-  goto Lfail;
-
-LfailFix:
-  err = "unable to fix";
-  goto Lfail;
-
-LfailMap:
-  err  = "unable to mmap";
-  serr = strerror(errno);
-  goto Lfail;
-
-Lfail : {
-  unmap_data();
-  if (!silent) {
-    if (reconfigure) {
-      RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "%s: [%s] %s: disabling database\n"
-                                                "You may need to 'reconfigure' your cache manually.  Please refer to\n"
-                                                "the 'Configuration' chapter in the manual.",
-                       err, config_filename, serr ? serr : "");
-    } else {
-      RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "%s: [%s] %s: reinitializing database", err, config_filename, serr ? serr : "");
-    }
-  }
-}
-  ret = -1;
-  goto Lcontinue;
-}
-
-bool
-MultiCacheBase::verify_header()
-{
-  return mapped_header->magic == magic && mapped_header->version.ink_major == version.ink_major &&
-         mapped_header->version.ink_minor == version.ink_minor && mapped_header->levels == levels &&
-         mapped_header->tag_bits == tag_bits && mapped_header->max_hits == max_hits && mapped_header->elementsize == elementsize &&
-         mapped_header->buckets == buckets && mapped_header->level_offset[0] == level_offset[0] &&
-         mapped_header->level_offset[1] == level_offset[1] && mapped_header->level_offset[2] == level_offset[2] &&
-         mapped_header->elements[0] == elements[0] && mapped_header->elements[1] == elements[1] &&
-         mapped_header->elements[2] == elements[2] && mapped_header->bucketsize[0] == bucketsize[0] &&
-         mapped_header->bucketsize[1] == bucketsize[1] && mapped_header->bucketsize[2] == bucketsize[2] &&
-         mapped_header->totalelements == totalelements && mapped_header->totalsize == totalsize &&
-         mapped_header->nominal_elements == nominal_elements;
-}
-
-void
-MultiCacheBase::print_info(FILE *fp)
-{ // STDIO OK
-  fprintf(fp, "    Elements:       %-10d\n", totalelements);
-  fprintf(fp, "    Size (bytes):   %-10u\n", totalsize);
-}
-
-//
-//  We need to preserve the buckets
-// while moving the existing data into the new locations.
-//
-// if data == NULL we are rebuilding (as opposed to check or fix)
-//
-int
-MultiCacheBase::rebuild(MultiCacheBase &old, int kind)
-{
-  char *new_data = 0;
-
-  ink_assert(store_verify(store));
-  ink_assert(store_verify(old.store));
-
-  // map in a chunk of space to use as scratch (check)
-  // or to copy the database to.
-  ats_scoped_fd fd(socketManager.open("/dev/zero", O_RDONLY));
-  if (fd < 0) {
-    Warning("unable to open /dev/zero: %d, %s", errno, strerror(errno));
-    return -1;
-  }
-
-  new_data = (char *)mmap(0, old.totalsize, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
-
-  ink_assert(data != new_data);
-  if (new_data == NULL || new_data == (caddr_t)MAP_FAILED) {
-    Warning("unable to mmap /dev/zero for %u bytes: %d, %s", totalsize, errno, strerror(errno));
-    return -1;
-  }
-  // if we are rebuilding get the original data
-
-  if (!data) {
-    ink_assert(kind == MC_REBUILD);
-    if (old.mmap_data(true, true) < 0)
-      return -1;
-    memcpy(new_data, old.data, old.totalsize);
-    old.unmap_data();
-    // now map the new location
-    if (mmap_data() < 0)
-      return -1;
-    // old.data is the copy
-    old.data = new_data;
-  } else {
-    ink_assert(kind == MC_REBUILD_CHECK || kind == MC_REBUILD_FIX);
-    if (kind == MC_REBUILD_CHECK) {
-      // old.data is the original, data is the copy
-      old.data = data;
-      data     = new_data;
-    } else {
-      memcpy(new_data, data, old.totalsize);
-      // old.data is the copy, data is the original
-      old.data = new_data;
-    }
-  }
-
-  ink_assert(buckets == old.buckets);
-
-  FILE *diag_output_fp = stderr;
-
-  RebuildMC r;
-
-  r.data = old.data;
-
-  r.rebuild = kind == MC_REBUILD;
-  r.check   = kind == MC_REBUILD_CHECK;
-  r.fix     = kind == MC_REBUILD_FIX;
-
-  r.deleted    = 0;
-  r.backed     = 0;
-  r.duplicates = 0;
-  r.stale      = 0;
-  r.corrupt    = 0;
-  r.good       = 0;
-  r.total      = 0;
-
-  if (r.rebuild)
-    fprintf(diag_output_fp, "New:\n");
-  print_info(diag_output_fp);
-  if (r.rebuild || r.fix) {
-    fprintf(diag_output_fp, "Old:\n");
-    old.print_info(diag_output_fp);
-    clear_but_heap();
-  }
-
-  fprintf(diag_output_fp, "    [processing element.. ");
-
-  int scan = 0;
-  for (int l = old.levels - 1; l >= 0; l--)
-    for (int b = 0; b < old.buckets; b++) {
-      r.partition = partition_of_bucket(b);
-      for (int e = 0; e < old.elements[l]; e++) {
-        scan++;
-        if (!(scan & 0x7FFF))
-          fprintf(diag_output_fp, "%d ", scan);
-        char *x = old.data + old.level_offset[l] + b * old.bucketsize[l] + e * elementsize;
-        rebuild_element(b, x, r);
-      }
-    }
-  if (scan & 0x7FFF)
-    printf("done]\n");
-  if (r.rebuild || r.fix)
-    for (int p = 0; p < MULTI_CACHE_PARTITIONS; p++)
-      sync_partition(p);
-
-  fprintf(diag_output_fp, "    Usage Summary\n");
-  fprintf(diag_output_fp, "\tTotal:      %-10d\n", r.total);
-  if (r.good)
-    fprintf(diag_output_fp, "\tGood:       %.2f%% (%d)\n", r.total ? ((r.good * 100.0) / r.total) : 0, r.good);
-  if (r.deleted)
-    fprintf(diag_output_fp, "\tDeleted:    %5.2f%% (%d)\n", r.deleted ? ((r.deleted * 100.0) / r.total) : 0.0, r.deleted);
-  if (r.backed)
-    fprintf(diag_output_fp, "\tBacked:     %5.2f%% (%d)\n", r.backed ? ((r.backed * 100.0) / r.total) : 0.0, r.backed);
-  if (r.duplicates)
-    fprintf(diag_output_fp, "\tDuplicates: %5.2f%% (%d)\n", r.duplicates ? ((r.duplicates * 100.0) / r.total) : 0.0, r.duplicates);
-  if (r.stale)
-    fprintf(diag_output_fp, "\tStale:      %5.2f%% (%d)\n", r.stale ? ((r.stale * 100.0) / r.total) : 0.0, r.stale);
-  if (r.corrupt)
-    fprintf(diag_output_fp, "\tCorrupt:    %5.2f%% (%d)\n", r.corrupt ? ((r.corrupt * 100.0) / r.total) : 0.0, r.corrupt);
-
-  old.reset();
-
-  return 0;
-}
-
-int
-MultiCacheBase::check(const char *config_filename, bool fix)
-{
-  //  rebuild
-  Store tStore;
-  char t_db_filename[PATH_NAME_MAX];
-  t_db_filename[0] = 0;
-  int t_db_size = 0, t_db_buckets = 0;
-  if (read_config(config_filename, tStore, t_db_filename, &t_db_size, &t_db_buckets) <= 0)
-    return -1;
-
-  MultiCacheBase *old = dup();
-
-  if (old->initialize(&tStore, filename, nominal_elements, buckets) <= 0) {
-    delete old;
-    return -1;
-  }
-
-  int res = rebuild(*old, fix ? MC_REBUILD_FIX : MC_REBUILD_CHECK);
-  delete old;
-  return res;
-}
-
-int
-MultiCacheBase::sync_heap(int part)
-{
-  if (heap_size) {
-    int b_per_part = heap_size / MULTI_CACHE_PARTITIONS;
-    if (ats_msync(data + level_offset[2] + buckets * bucketsize[2] + b_per_part * part, b_per_part, data + totalsize, MS_SYNC) < 0)
-      return -1;
-  }
-  return 0;
-}
-
-//
-// Sync a single partition
-//
-// Since we delete from the higher levels
-// and insert into the lower levels,
-// start with the higher levels to reduce the risk of duplicates.
-//
-int
-MultiCacheBase::sync_partition(int partition)
-{
-  int res = 0;
-  int b   = first_bucket_of_partition(partition);
-  int n   = buckets_of_partition(partition);
-  // L3
-  if (levels > 2) {
-    if (ats_msync(data + level_offset[2] + b * bucketsize[2], n * bucketsize[2], data + totalsize, MS_SYNC) < 0)
-      res = -1;
-  }
-  // L2
-  if (levels > 1) {
-    if (ats_msync(data + level_offset[1] + b * bucketsize[1], n * bucketsize[1], data + totalsize, MS_SYNC) < 0)
-      res = -1;
-  }
-  // L1
-  if (ats_msync(data + b * bucketsize[0], n * bucketsize[0], data + totalsize, MS_SYNC) < 0)
-    res = -1;
-  return res;
-}
-
-int
-MultiCacheBase::sync_header()
-{
-  *mapped_header = *(MultiCacheHeader *)this;
-  return ats_msync((char *)mapped_header, STORE_BLOCK_SIZE, (char *)mapped_header + STORE_BLOCK_SIZE, MS_SYNC);
-}
-
-int
-MultiCacheBase::sync_all()
-{
-  int res = 0, i = 0;
-  for (i = 0; i < MULTI_CACHE_PARTITIONS; i++)
-    if (sync_heap(i) < 0)
-      res = -1;
-  for (i = 0; i < MULTI_CACHE_PARTITIONS; i++)
-    if (sync_partition(i) < 0)
-      res = -1;
-  if (sync_header())
-    res = -1;
-  return res;
-}
-
-//
-// Syncs MulitCache
-//
-struct MultiCacheSync;
-typedef int (MultiCacheSync::*MCacheSyncHandler)(int, void *);
-
-struct MultiCacheSync : public Continuation {
-  int partition;
-  MultiCacheBase *mc;
-  Continuation *cont;
-  int before_used;
-
-  int
-  heapEvent(int event, Event *e)
-  {
-    if (!partition) {
-      before_used     = mc->heap_used[mc->heap_halfspace];
-      mc->header_snap = *(MultiCacheHeader *)mc;
-    }
-    if (partition < MULTI_CACHE_PARTITIONS) {
-      mc->sync_heap(partition++);
-      e->schedule_imm();
-      return EVENT_CONT;
-    }
-    *mc->mapped_header = mc->header_snap;
-    ink_assert(!ats_msync((char *)mc->mapped_header, STORE_BLOCK_SIZE, (char *)mc->mapped_header + STORE_BLOCK_SIZE, MS_SYNC));
-    partition = 0;
-    SET_HANDLER((MCacheSyncHandler)&MultiCacheSync::mcEvent);
-    return mcEvent(event, e);
-  }
-
-  int
-  mcEvent(int event, Event *e)
-  {
-    (void)event;
-    if (partition >= MULTI_CACHE_PARTITIONS) {
-      cont->handleEvent(MULTI_CACHE_EVENT_SYNC, 0);
-      Debug("multicache", "MultiCacheSync done (%d, %d)", mc->heap_used[0], mc->heap_used[1]);
-      delete this;
-      return EVENT_DONE;
-    }
-    mc->fixup_heap_offsets(partition, before_used);
-    mc->sync_partition(partition);
-    partition++;
-    mutex = e->ethread->mutex;
-    SET_HANDLER((MCacheSyncHandler)&MultiCacheSync::pauseEvent);
-    e->schedule_in(MAX(MC_SYNC_MIN_PAUSE_TIME, HRTIME_SECONDS(hostdb_sync_frequency - 5) / MULTI_CACHE_PARTITIONS));
-    return EVENT_CONT;
-  }
-
-  int
-  pauseEvent(int event, Event *e)
-  {
-    (void)event;
-    (void)e;
-    if (partition < MULTI_CACHE_PARTITIONS)
-      mutex = mc->locks[partition];
-    else
-      mutex = cont->mutex;
-    SET_HANDLER((MCacheSyncHandler)&MultiCacheSync::mcEvent);
-    e->schedule_imm();
-    return EVENT_CONT;
-  }
-
-  MultiCacheSync(Continuation *acont, MultiCacheBase *amc)
-    : Continuation(amc->locks[0].get()), partition(0), mc(amc), cont(acont), before_used(0)
-  {
-    mutex = mc->locks[partition];
-    SET_HANDLER((MCacheSyncHandler)&MultiCacheSync::heapEvent);
-  }
-};
-
-//
-// Heap code
-//
-
-UnsunkPtrRegistry *
-MultiCacheBase::fixup_heap_offsets(int partition, int before_used, UnsunkPtrRegistry *r, int base)
-{
-  if (!r)
-    r        = &unsunk[partition];
-  bool found = 0;
-  for (int i = 0; i < r->n; i++) {
-    UnsunkPtr &p = r->ptrs[i];
-    if (p.offset) {
-      Debug("multicache", "fixup p.offset %d offset %d %" PRId64 " part %d", p.offset, *p.poffset,
-            (int64_t)((char *)p.poffset - data), partition);
-      if (*p.poffset == -(i + base) - 1) {
-        if (halfspace_of(p.offset) != heap_halfspace) {
-          ink_assert(0);
-          *p.poffset = 0;
-        } else {
-          if (p.offset < before_used) {
-            *p.poffset = p.offset + 1;
-            ink_assert(*p.poffset);
-          } else
-            continue;
-        }
-      } else {
-        Debug("multicache", "not found %" PRId64 " i %d base %d *p.poffset = %d", (int64_t)((char *)p.poffset - data), i, base,
-              *p.poffset);
-      }
-      p.offset     = 0;
-      p.poffset    = (int *)r->next_free;
-      r->next_free = &p;
-      found        = true;
-    }
-  }
-  if (r->next) {
-    int s   = MULTI_CACHE_UNSUNK_PTR_BLOCK_SIZE(totalelements);
-    r->next = fixup_heap_offsets(partition, before_used, r->next, base + s);
-  }
-  if (!r->next && !found && r != &unsunk[partition]) {
-    delete r;
-    return NULL;
-  }
-  return r;
-}
-
-struct OffsetTable {
-  int new_offset;
-  int *poffset;
-};
-
-struct MultiCacheHeapGC;
-typedef int (MultiCacheHeapGC::*MCacheHeapGCHandler)(int, void *);
-struct MultiCacheHeapGC : public Continuation {
-  Continuation *cont;
-  MultiCacheBase *mc;
-  int partition;
-  int n_offsets;
-  OffsetTable *offset_table;
-
-  int
-  startEvent(int event, Event *e)
-  {
-    (void)event;
-    if (partition < MULTI_CACHE_PARTITIONS) {
-      // copy heap data
-
-      char *before = mc->heap + mc->heap_used[mc->heap_halfspace];
-      mc->copy_heap(partition, this);
-      char *after = mc->heap + mc->heap_used[mc->heap_halfspace];
-
-      // sync new heap data and header (used)
-
-      if (after - before > 0) {
-        ink_assert(!ats_msync(before, after - before, mc->heap + mc->totalsize, MS_SYNC));
-        ink_assert(!ats_msync((char *)mc->mapped_header, STORE_BLOCK_SIZE, (char *)mc->mapped_header + STORE_BLOCK_SIZE, MS_SYNC));
-      }
-      // update table to point to new entries
-
-      for (int i = 0; i < n_offsets; i++) {
-        int *i1, i2;
-        // BAD CODE GENERATION ON THE ALPHA
-        //*(offset_table[i].poffset) = offset_table[i].new_offset + 1;
-        i1  = offset_table[i].poffset;
-        i2  = offset_table[i].new_offset + 1;
-        *i1 = i2;
-      }
-      n_offsets = 0;
-      mc->sync_partition(partition);
-      partition++;
-      if (partition < MULTI_CACHE_PARTITIONS)
-        mutex = mc->locks[partition];
-      else
-        mutex = cont->mutex;
-      e->schedule_in(MAX(MC_SYNC_MIN_PAUSE_TIME, HRTIME_SECONDS(hostdb_sync_frequency - 5) / MULTI_CACHE_PARTITIONS));
-      return EVENT_CONT;
-    }
-    mc->heap_used[mc->heap_halfspace ? 0 : 1] = 8; // skip 0
-    cont->handleEvent(MULTI_CACHE_EVENT_SYNC, 0);
-    Debug("multicache", "MultiCacheHeapGC done");
-    delete this;
-    return EVENT_DONE;
-  }
-
-  MultiCacheHeapGC(Continuation *acont, MultiCacheBase *amc)
-    : Continuation(amc->locks[0].get()), cont(acont), mc(amc), partition(0), n_offsets(0)
-  {
-    SET_HANDLER((MCacheHeapGCHandler)&MultiCacheHeapGC::startEvent);
-    offset_table = (OffsetTable *)ats_malloc(sizeof(OffsetTable) *
-                                             ((mc->totalelements / MULTI_CACHE_PARTITIONS) + mc->elements[mc->levels - 1] * 3 + 1));
-    // flip halfspaces
-    mutex              = mc->locks[partition];
-    mc->heap_halfspace = mc->heap_halfspace ? 0 : 1;
-  }
-  ~MultiCacheHeapGC() { ats_free(offset_table); }
-};
-
-void
-MultiCacheBase::sync_partitions(Continuation *cont)
-{
-  // don't try to sync if we were not correctly initialized
-  if (data && mapped_header) {
-    if (heap_used[heap_halfspace] > halfspace_size() * MULTI_CACHE_HEAP_HIGH_WATER)
-      eventProcessor.schedule_imm(new MultiCacheHeapGC(cont, this), ET_TASK);
-    else
-      eventProcessor.schedule_imm(new MultiCacheSync(cont, this), ET_TASK);
-  }
-}
-
-void
-MultiCacheBase::copy_heap_data(char *src, int s, int *pi, int partition, MultiCacheHeapGC *gc)
-{
-  char *dest = (char *)alloc(NULL, s);
-  Debug("multicache", "copy %p to %p", src, dest);
-  if (dest) {
-    memcpy(dest, src, s);
-    if (*pi < 0) { // already in the unsunk ptr registry, ok to change there
-      UnsunkPtr *ptr = unsunk[partition].ptr(-*pi - 1);
-      if (ptr->poffset == pi)
-        ptr->offset = dest - heap;
-      else {
-        ink_assert(0);
-        *pi = 0;
-      }
-    } else {
-      gc->offset_table[gc->n_offsets].new_offset = dest - heap;
-      gc->offset_table[gc->n_offsets].poffset    = pi;
-      gc->n_offsets++;
-    }
-  } else {
-    ink_assert(0);
-    *pi = 0;
-  }
-}
-
-UnsunkPtrRegistry::UnsunkPtrRegistry() : mc(NULL), n(0), ptrs(NULL), next_free(NULL), next(NULL)
-{
-}
-
-UnsunkPtrRegistry::~UnsunkPtrRegistry()
-{
-  ats_free(ptrs);
-}
-
-void
-UnsunkPtrRegistry::alloc_data()
-{
-  int bs   = MULTI_CACHE_UNSUNK_PTR_BLOCK_SIZE(mc->totalelements);
-  size_t s = bs * sizeof(UnsunkPtr);
-  ptrs     = (UnsunkPtr *)ats_malloc(s);
-  for (int i = 0; i < bs; i++) {
-    ptrs[i].offset  = 0;
-    ptrs[i].poffset = (int *)&ptrs[i + 1];
-  }
-  ptrs[bs - 1].poffset = NULL;
-  next_free            = ptrs;
-  n                    = bs;
-}
-
-UnsunkPtr *
-UnsunkPtrRegistry::alloc(int *poffset, int base)
-{
-  if (next_free) {
-    UnsunkPtr *res = next_free;
-    next_free      = (UnsunkPtr *)next_free->poffset;
-    *poffset       = -(base + (res - ptrs)) - 1;
-    ink_assert(*poffset);
-    return res;
-  } else {
-    if (!ptrs) {
-      alloc_data();
-      return alloc(poffset, base);
-    }
-    if (!next) {
-      next     = new UnsunkPtrRegistry;
-      next->mc = mc;
-    }
-    int s = MULTI_CACHE_UNSUNK_PTR_BLOCK_SIZE(mc->totalelements);
-    return next->alloc(poffset, base + s);
-  }
-}
-
-void *
-MultiCacheBase::alloc(int *poffset, int asize)
-{
-  int h    = heap_halfspace;
-  int size = (asize + MULTI_CACHE_HEAP_ALIGNMENT - 1) & ~(MULTI_CACHE_HEAP_ALIGNMENT - 1);
-  int o    = ink_atomic_increment((int *)&heap_used[h], size);
-
-  if (o + size > halfspace_size()) {
-    ink_atomic_increment((int *)&heap_used[h], -size);
-    ink_assert(!"out of space");
-    if (poffset)
-      *poffset = 0;
-    return NULL;
-  }
-  int offset = (h ? halfspace_size() : 0) + o;
-  char *p    = heap + offset;
-  if (poffset) {
-    int part = ptr_to_partition((char *)poffset);
-    if (part < 0)
-      return NULL;
-    UnsunkPtr *up = unsunk[part].alloc(poffset);
-    up->offset    = offset;
-    up->poffset   = poffset;
-    Debug("multicache", "alloc unsunk %d at %" PRId64 " part %d offset %d", *poffset, (int64_t)((char *)poffset - data), part,
-          offset);
-  }
-  return (void *)p;
-}
-
-UnsunkPtr *
-UnsunkPtrRegistry::ptr(int i)
-{
-  if (i >= n) {
-    if (!next)
-      return NULL;
-    else
-      return next->ptr(i - n);
-  } else {
-    if (!ptrs)
-      return NULL;
-    return &ptrs[i];
-  }
-}
-
-void *
-MultiCacheBase::ptr(int *poffset, int partition)
-{
-  int o = *poffset;
-  Debug("multicache", "ptr %" PRId64 " part %d %d", (int64_t)((char *)poffset - data), partition, o);
-  if (o > 0) {
-    if (!valid_offset(o)) {
-      ink_assert(!"bad offset");
-      *poffset = 0;
-      return NULL;
-    }
-    return (void *)(heap + o - 1);
-  }
-  if (!o)
-    return NULL;
-  UnsunkPtr *p = unsunk[partition].ptr(-o - 1);
-  if (!p)
-    return NULL;
-  if (p->poffset != poffset)
-    return NULL;
-  return (void *)(heap + p->offset);
-}
-
-void
-MultiCacheBase::update(int *poffset, int *old_poffset)
-{
-  int o = *poffset;
-  Debug("multicache", "updating %" PRId64 " %d", (int64_t)((char *)poffset - data), o);
-  if (o > 0) {
-    if (!valid_offset(o)) {
-      ink_assert(!"bad poffset");
-      *poffset = 0;
-    }
-    return;
-  }
-  if (!o)
-    return;
-
-  int part = ptr_to_partition((char *)poffset);
-
-  if (part < 0)
-    return;
-
-  UnsunkPtr *p = unsunk[part].ptr(-*old_poffset - 1);
-  if (!p || p->poffset != old_poffset) {
-    *poffset = 0;
-    return;
-  }
-  ink_assert(p->poffset != poffset);
-  UnsunkPtr *n = unsunk[part].alloc(poffset);
-  n->poffset   = poffset;
-  n->offset    = p->offset;
-}
-
-int
-MultiCacheBase::ptr_to_partition(char *ptr)
-{
-  int o = ptr - data;
-  if (o < level_offset[0])
-    return -1;
-  if (o < level_offset[1])
-    return partition_of_bucket((o - level_offset[0]) / bucketsize[0]);
-  if (o < level_offset[2])
-    return partition_of_bucket((o - level_offset[1]) / bucketsize[1]);
-  if (o < level_offset[2] + elements[2] * elementsize)
-    return partition_of_bucket((o - level_offset[2]) / bucketsize[2]);
-  return -1;
-}
-
-void
-stealStore(Store &s, int blocks)
-{
-  if (s.read_config())
-    return;
-  Store tStore;
-  MultiCacheBase dummy;
-  if (dummy.read_config("hostdb.config", tStore) > 0) {
-    Store dStore;
-    s.try_realloc(tStore, dStore);
-  }
-  tStore.delete_all();
-  if (dummy.read_config("dir.config", tStore) > 0) {
-    Store dStore;
-    s.try_realloc(tStore, dStore);
-  }
-  tStore.delete_all();
-  if (dummy.read_config("alt.config", tStore) > 0) {
-    Store dStore;
-    s.try_realloc(tStore, dStore);
-  }
-  // grab some end portion of some block... so as not to damage the
-  // pool header
-  for (unsigned d = 0; d < s.n_disks;) {
-    Span *ds = s.disk[d];
-    while (ds) {
-      if (!blocks)
-        ds->blocks = 0;
-      else {
-        int b = blocks;
-        if ((int)ds->blocks < blocks)
-          b = ds->blocks;
-        if (ds->file_pathname)
-          ds->offset += (ds->blocks - b);
-        ds->blocks = b;
-        blocks -= b;
-      }
-      ds = ds->link.next;
-    }
-    d++;
-  }
-}
diff --git a/iocore/hostdb/P_HostDB.h b/iocore/hostdb/P_HostDB.h
index 9d616a4..d178d96 100644
--- a/iocore/hostdb/P_HostDB.h
+++ b/iocore/hostdb/P_HostDB.h
@@ -44,12 +44,12 @@
 
 // HostDB files
 #include "P_DNS.h"
-#include "P_MultiCache.h"
+#include "P_RefCountCache.h"
 #include "P_HostDBProcessor.h"
 
 #undef HOSTDB_MODULE_VERSION
 #define HOSTDB_MODULE_VERSION makeModuleVersion(HOSTDB_MODULE_MAJOR_VERSION, HOSTDB_MODULE_MINOR_VERSION, PRIVATE_MODULE_HEADER)
-HostDBInfo *probe(ProxyMutex *mutex, HostDBMD5 const &md5, bool ignore_timeout);
+Ptr<HostDBInfo> probe(ProxyMutex *mutex, HostDBMD5 const &md5, bool ignore_timeout);
 
 void make_md5(INK_MD5 &md5, const char *hostname, int len, int port, char const *pDNSServers, HostDBMark mark);
 #endif
diff --git a/iocore/hostdb/P_HostDBProcessor.h b/iocore/hostdb/P_HostDBProcessor.h
index 87afbd6..e7a1e45 100644
--- a/iocore/hostdb/P_HostDBProcessor.h
+++ b/iocore/hostdb/P_HostDBProcessor.h
@@ -135,23 +135,17 @@ HOSTDB_CLIENT_IP_HASH(sockaddr const *lhs, sockaddr const *rhs)
 //#define TEST(_x) _x
 #define TEST(_x)
 
-#ifdef _HOSTDB_CC_
-template struct MultiCache<HostDBInfo>;
-#endif /* _HOSTDB_CC_ */
-
 struct ClusterMachine;
 struct HostEnt;
 struct ClusterConfiguration;
 
 // Stats
 enum HostDB_Stats {
-  hostdb_total_entries_stat,
   hostdb_total_lookups_stat,
   hostdb_total_hits_stat,  // D == total hits
   hostdb_ttl_stat,         // D average TTL
   hostdb_ttl_expires_stat, // D == TTL Expires
   hostdb_re_dns_on_reload_stat,
-  hostdb_bytes_stat,
   HostDB_Stat_Count
 };
 
@@ -200,29 +194,17 @@ struct RefCountedHostsFileMap : public RefCountObj {
 //
 // HostDBCache (Private)
 //
-struct HostDBCache : public MultiCache<HostDBInfo> {
-  int rebuild_callout(HostDBInfo *e, RebuildMC &r);
+struct HostDBCache {
   int start(int flags = 0);
-  MultiCacheBase *
-  dup()
-  {
-    return new HostDBCache;
-  }
-
-  // This accounts for an average of 2 HostDBInfo per DNS cache (for round-robin etc.)
-  // In addition, we can do a padding for additional SRV records storage.
-  // In addition, we add 120 for hostname storage (since we now always do that)
-  virtual size_t
-  estimated_heap_bytes_per_entry() const
-  {
-    return sizeof(HostDBInfo) * 2 + 512 * hostdb_srv_enabled + 120;
-  }
-
   // Map to contain all of the host file overrides, initialize it to empty
   Ptr<RefCountedHostsFileMap> hosts_file_ptr;
+  // TODO: make ATS call a close() method or something on shutdown (it does nothing of the sort today)
+  RefCountCache<HostDBInfo> *refcountcache;
 
-  Queue<HostDBContinuation, Continuation::Link_link> pending_dns[MULTI_CACHE_PARTITIONS];
+  // TODO configurable number of items in the cache
+  Queue<HostDBContinuation, Continuation::Link_link> *pending_dns;
   Queue<HostDBContinuation, Continuation::Link_link> &pending_dns_for_hash(INK_MD5 &md5);
+  Queue<HostDBContinuation, Continuation::Link_link> *remoteHostDBQueue;
   HostDBCache();
 };
 
@@ -236,7 +218,7 @@ HostDBRoundRobin::index_of(sockaddr const *ip)
   }
 
   for (int i = 0; i < good; i++) {
-    if (ats_ip_addr_eq(ip, info[i].ip())) {
+    if (ats_ip_addr_eq(ip, info(i).ip())) {
       return i;
     }
   }
@@ -248,7 +230,7 @@ inline HostDBInfo *
 HostDBRoundRobin::find_ip(sockaddr const *ip)
 {
   int idx = this->index_of(ip);
-  return idx < 0 ? NULL : &info[idx];
+  return idx < 0 ? NULL : &info(idx);
 }
 
 inline HostDBInfo *
@@ -259,7 +241,7 @@ HostDBRoundRobin::select_next(sockaddr const *ip)
     int idx = this->index_of(ip);
     if (idx >= 0) {
       idx  = (idx + 1) % good;
-      zret = &info[idx];
+      zret = &info(idx);
     }
   }
   return zret;
@@ -276,8 +258,8 @@ HostDBRoundRobin::find_target(const char *target)
 
   uint32_t key = makeHostHash(target);
   for (int i = 0; i < good; i++) {
-    if (info[i].data.srv.key == key && !strcmp(target, info[i].srvname(this)))
-      return &info[i];
+    if (info(i).data.srv.key == key && !strcmp(target, info(i).srvname(this)))
+      return &info(i);
   }
   return NULL;
 }
@@ -301,7 +283,7 @@ HostDBRoundRobin::select_best_http(sockaddr const *client_ip, ink_time_t now, in
     // Check that the host we selected is alive
     for (int i = 0; i < good; i++) {
       best_any = current++ % good;
-      if (info[best_any].alive(now, fail_window)) {
+      if (info(best_any).is_alive(now, fail_window)) {
         best_up = best_any;
         break;
       }
@@ -315,7 +297,7 @@ HostDBRoundRobin::select_best_http(sockaddr const *client_ip, ink_time_t now, in
     }
     for (int i = 0; i < good; i++) {
       best_any = current++ % good;
-      if (info[best_any].alive(now, fail_window)) {
+      if (info(best_any).is_alive(now, fail_window)) {
         best_up = best_any;
         break;
       }
@@ -327,13 +309,13 @@ HostDBRoundRobin::select_best_http(sockaddr const *client_ip, ink_time_t now, in
     unsigned int best_hash_up  = 0;
     sockaddr const *ip;
     for (int i = 0; i < good; i++) {
-      ip             = info[i].ip();
+      ip             = info(i).ip();
       unsigned int h = HOSTDB_CLIENT_IP_HASH(client_ip, ip);
       if (best_hash_any <= h) {
         best_any      = i;
         best_hash_any = h;
       }
-      if (info[i].alive(now, fail_window)) {
+      if (info(i).is_alive(now, fail_window)) {
         if (best_hash_up <= h) {
           best_up      = i;
           best_hash_up = h;
@@ -344,10 +326,10 @@ HostDBRoundRobin::select_best_http(sockaddr const *client_ip, ink_time_t now, in
 
   if (best_up != -1) {
     ink_assert(best_up >= 0 && best_up < good);
-    return &info[best_up];
+    return &info(best_up);
   } else {
     ink_assert(best_any >= 0 && best_any < good);
-    return &info[best_any];
+    return &info(best_any);
   }
 }
 
@@ -363,7 +345,7 @@ HostDBRoundRobin::select_best_srv(char *target, InkRand *rand, ink_time_t now, i
 
 #ifdef DEBUG
   for (int i = 1; i < good; ++i) {
-    ink_assert(info[i].data.srv.srv_priority >= info[i - 1].data.srv.srv_priority);
+    ink_assert(info(i).data.srv.srv_priority >= info(i - 1).data.srv.srv_priority);
   }
 #endif
 
@@ -373,25 +355,23 @@ HostDBRoundRobin::select_best_srv(char *target, InkRand *rand, ink_time_t now, i
   HostDBInfo *infos[HOST_DB_MAX_ROUND_ROBIN_INFO];
 
   do {
-    if (info[i].app.http_data.last_failure != 0 && (uint32_t)(now - fail_window) < info[i].app.http_data.last_failure) {
+    // if the real isn't alive-- exclude it from selection
+    if (info(i).is_alive(now, fail_window)) {
       continue;
     }
 
-    if (info[i].app.http_data.last_failure)
-      info[i].app.http_data.last_failure = 0;
-
-    if (info[i].data.srv.srv_priority <= p) {
-      p = info[i].data.srv.srv_priority;
-      weight += info[i].data.srv.srv_weight;
-      infos[len++] = &info[i];
+    if (info(i).data.srv.srv_priority <= p) {
+      p = info(i).data.srv.srv_priority;
+      weight += info(i).data.srv.srv_weight;
+      infos[len++] = &info(i);
     } else
       break;
   } while (++i < good);
 
   if (len == 0) { // all failed
-    result = &info[current++ % good];
+    result = &info(current++ % good);
   } else if (weight == 0) { // srv weight is 0
-    result = &info[current++ % len];
+    result = &info(current++ % len);
   } else {
     uint32_t xx = rand->random() % weight;
     for (i = 0; i < len && xx >= infos[i]->data.srv.srv_weight; ++i)
@@ -465,7 +445,7 @@ struct HostDBContinuation : public Continuation {
   Continuation *from_cont;
   HostDBApplicationInfo app;
   int probe_depth;
-  int current_iterate_pos;
+  size_t current_iterate_pos;
   ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH];
   //  char name[MAXDNAME];
   //  int namelen;
@@ -502,7 +482,8 @@ struct HostDBContinuation : public Continuation {
   {
     return md5.db_mark == HOSTDB_MARK_SRV;
   }
-  HostDBInfo *lookup_done(IpAddr const &ip, char const *aname, bool round_robin, unsigned int attl, SRVHosts *s = NULL);
+  HostDBInfo *lookup_done(IpAddr const &ip, char const *aname, bool round_robin, unsigned int attl, SRVHosts *s = NULL,
+                          HostDBInfo *r = NULL);
   bool do_get_response(Event *e);
   void do_put_response(ClusterMachine *m, HostDBInfo *r, Continuation *cont);
   int failed_cluster_request(Event *e);
@@ -551,8 +532,6 @@ struct HostDBContinuation : public Continuation {
   }
 };
 
-// extern Queue<HostDBContinuation>  remoteHostDBQueue[MULTI_CACHE_PARTITIONS];
-
 inline unsigned int
 master_hash(INK_MD5 const &md5)
 {
@@ -568,13 +547,13 @@ is_dotted_form_hostname(const char *c)
 inline Queue<HostDBContinuation> &
 HostDBCache::pending_dns_for_hash(INK_MD5 &md5)
 {
-  return pending_dns[partition_of_bucket((int)(fold_md5(md5) % hostDB.buckets))];
+  return pending_dns[this->refcountcache->partition_for_key(md5.fold())];
 }
 
 inline int
 HostDBContinuation::key_partition()
 {
-  return hostDB.partition_of_bucket(fold_md5(md5.hash) % hostDB.buckets);
+  return hostDB.refcountcache->partition_for_key(md5.hash.fold());
 }
 
 #endif /* _P_HostDBProcessor_h_ */
diff --git a/iocore/hostdb/P_MultiCache.h b/iocore/hostdb/P_MultiCache.h
deleted file mode 100644
index 2547c58..0000000
--- a/iocore/hostdb/P_MultiCache.h
+++ /dev/null
@@ -1,705 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/****************************************************************************
-
-  MultiCache.h
-
-
- ****************************************************************************/
-
-#ifndef _MultiCache_h_
-#define _MultiCache_h_
-
-#include "I_EventSystem.h"
-#include "I_Store.h"
-
-//
-// Constants
-//
-
-#define MULTI_CACHE_MAX_LEVELS 3
-#define MULTI_CACHE_MAX_BUCKET_SIZE 256
-#define MULTI_CACHE_MAX_FILES 256
-#define MULTI_CACHE_PARTITIONS 64
-
-#define MULTI_CACHE_EVENT_SYNC MULTI_CACHE_EVENT_EVENTS_START
-
-// for heap_offset() and heap_size(), indicates no data
-#define MULTI_CACHE_HEAP_NONE -1
-
-#define MULTI_CACHE_MAGIC_NUMBER 0x0BAD2D8
-
-// Update these if there is a change to MultiCacheBase
-// There is a separate HOST_DB_CACHE_[MAJOR|MINOR]_VERSION
-#define MULTI_CACHE_MAJOR_VERSION 2
-#define MULTI_CACHE_MINOR_VERSION 1
-// 2.1 - IPv6 compatible
-
-#define MULTI_CACHE_HEAP_HIGH_WATER 0.8
-
-#define MULTI_CACHE_HEAP_INITIAL sizeof(uint32_t)
-#define MULTI_CACHE_HEAP_ALIGNMENT 8
-
-// unused.. possible optimization
-#define MULTI_CACHE_OFFSET_PARITION(_x) ((_x) % MULTI_CACHE_PARTITIONS)
-#define MULTI_CACHE_OFFSET_INDEX(_x) ((_x) / MULTI_CACHE_PARTITIONS)
-#define MULTI_CACHE_OFFSET(_p, _o) ((_p) + (_o)*MULTI_CACHE_PARTITIONS)
-
-class ProxyMutex;
-class Continuation;
-
-//
-// Types
-//
-
-// MultiCacheBlock
-// This is an abstract class which simply documents the operations
-// required by the templated cache operations.
-
-struct MultiCacheBlock {
-  uint64_t tag();
-  bool is_deleted();
-  void set_deleted();
-  bool is_empty();
-  void set_empty();
-  void reset();
-  void set_full(uint64_t folded_md5, int buckets);
-  int
-  heap_size()
-  {
-    return 0;
-  }
-  int *
-  heap_offset_ptr()
-  {
-    return NULL;
-  }
-};
-
-struct RebuildMC {
-  bool rebuild;
-  bool check;
-  bool fix;
-  char *data;
-  int partition;
-
-  int deleted;
-  int backed;
-  int duplicates;
-  int corrupt;
-  int stale;
-  int good;
-  int total;
-};
-
-struct MultiCacheHeader {
-  unsigned int magic;
-  VersionNumber version;
-
-  unsigned int levels;
-
-  int tag_bits;
-  int max_hits;
-  int elementsize;
-
-  int buckets;
-  int level_offset[MULTI_CACHE_MAX_LEVELS];
-  int elements[MULTI_CACHE_MAX_LEVELS];
-  int bucketsize[MULTI_CACHE_MAX_LEVELS];
-
-  int totalelements;
-  unsigned int totalsize;
-
-  int nominal_elements;
-
-  // optional heap
-  int heap_size;
-  volatile int heap_halfspace;
-  volatile int heap_used[2];
-
-  MultiCacheHeader();
-};
-
-// size of block of unsunk pointers with respect to the number of
-// elements
-#define MULTI_CACHE_UNSUNK_PTR_BLOCK_SIZE(_e) ((_e / 8) / MULTI_CACHE_PARTITIONS)
-
-struct UnsunkPtr {
-  int offset;
-  int *poffset; // doubles as freelist pointer
-};
-
-struct MultiCacheBase;
-
-struct UnsunkPtrRegistry {
-  MultiCacheBase *mc;
-  int n;
-  UnsunkPtr *ptrs;
-  UnsunkPtr *next_free;
-  UnsunkPtrRegistry *next;
-
-  UnsunkPtr *ptr(int i);
-  UnsunkPtr *alloc(int *p, int base = 0);
-  void alloc_data();
-
-  UnsunkPtrRegistry();
-  ~UnsunkPtrRegistry();
-};
-
-//
-// Broken SunCC
-//
-#define PtrMutex Ptr<ProxyMutex>
-
-//
-// used by windows only - to keep track
-// of mapping handles
-//
-struct Unmaper {
-  void *hMap;
-  char *pAddr;
-};
-
-typedef int three_ints[3];
-typedef int two_ints[2];
-
-struct MultiCacheHeapGC;
-
-struct MultiCacheBase : public MultiCacheHeader {
-  Store *store;
-  char filename[PATH_NAME_MAX];
-  MultiCacheHeader *mapped_header;
-
-  MultiCacheHeader header_snap;
-
-  // mmap-ed region
-  //
-  char *data;
-  char *lowest_level_data;
-
-  // equal to data + level_offset[3] + bucketsize[3] * buckets;
-  char *heap;
-
-  // interface functions
-  //
-  int
-  halfspace_size()
-  {
-    return heap_size / 2;
-  }
-
-  // Stats support
-  //
-  int hit_stat[MULTI_CACHE_MAX_LEVELS];
-  int miss_stat;
-
-  unsigned int
-  lowest_level_data_size()
-  {
-    return (buckets + 3) / 4;
-  }
-  unsigned int
-  lowest_level(unsigned int bucket)
-  {
-    unsigned int i = (unsigned char)lowest_level_data[bucket / 4];
-    return 3 & (i >> (buckets % 4));
-  }
-  void
-  set_lowest_level(unsigned int bucket, unsigned int lowest)
-  {
-    unsigned char p = (unsigned char)lowest_level_data[bucket / 4];
-    p &= ~(3 << (buckets % 4));
-    p |= (lowest & 3) << (buckets % 4);
-    lowest_level_data[bucket / 4] = (char)p;
-  }
-
-  // Fixed point, 8 bits shifted left
-  int buckets_per_partitionF8;
-
-  int
-  partition_of_bucket(int b)
-  {
-    return ((b << 8) + 0xFF) / buckets_per_partitionF8;
-  }
-  int
-  first_bucket_of_partition(int p)
-  {
-    return ((buckets_per_partitionF8 * p) >> 8);
-  }
-  int
-  last_bucket_of_partition(int p)
-  {
-    return first_bucket_of_partition(p + 1) - 1;
-  }
-  int
-  buckets_of_partition(int p)
-  {
-    return last_bucket_of_partition(p) - first_bucket_of_partition(p) + 1;
-  }
-
-  int open(Store *store, const char *config_filename, char *db_filename = NULL, int db_size = -1, bool reconfigure = false,
-           bool fix = false, bool silent = false);
-
-  // 1 for success, 0 for no config file, -1 for failure
-  int read_config(const char *config_filename, Store &store, char *fn = NULL, int *pi = NULL, int *pbuckets = NULL);
-  int write_config(const char *config_filename, int nominal_size, int buckets);
-  int initialize(Store *store, char *filename, int elements, int buckets = 0, unsigned int levels = 2,
-                 int level0_elements_per_bucket = 4, int level1_elements_per_bucket = 32, int level2_elements_per_bucket = 1);
-  int mmap_data(bool private_flag = false, bool zero_fill = false);
-  char *mmap_region(int blocks, int *fds, char *cur, size_t &total_length, bool private_flag, int zero_fill = 0);
-  int blocks_in_level(unsigned int level);
-
-  bool verify_header();
-
-  int unmap_data();
-  void reset();
-  void clear(); // this zeros the data
-  void clear_but_heap();
-
-  virtual MultiCacheBase *
-  dup()
-  {
-    ink_assert(0);
-    return NULL;
-  }
-
-  virtual size_t
-  estimated_heap_bytes_per_entry() const
-  {
-    return 0;
-  }
-
-  void print_info(FILE *fp);
-
-//
-// Rebuild the database, also perform checks, and fixups
-// ** cannot be called on a running system **
-// "this" must be initialized.
-//
-#define MC_REBUILD 0
-#define MC_REBUILD_CHECK 1
-#define MC_REBUILD_FIX 2
-  int rebuild(MultiCacheBase &old, int kind = MC_REBUILD); // 0 on success
-
-  virtual void
-  rebuild_element(int buck, char *elem, RebuildMC &r)
-  {
-    (void)buck;
-    (void)elem;
-    (void)r;
-    ink_assert(0);
-  }
-
-  //
-  // Check the database
-  // ** cannot be called on a running system **
-  //  assumes that the configuration is correct
-  //
-  int check(const char *config_filename, bool fix = false);
-
-  ProxyMutex *
-  lock_for_bucket(int bucket)
-  {
-    return locks[partition_of_bucket(bucket)].get();
-  }
-
-  uint64_t
-  make_tag(uint64_t folded_md5)
-  {
-    uint64_t ttag = folded_md5 / (uint64_t)buckets;
-    if (!ttag)
-      return 1LL;
-    // beeping gcc 2.7.2 is broken
-    if (tag_bits > 32) {
-      uint64_t mask = 0x100000000LL << (tag_bits - 32);
-      mask          = mask - 1;
-      return ttag & mask;
-    } else {
-      uint64_t mask = 1LL;
-      mask <<= tag_bits;
-      mask = mask - 1;
-      return ttag & mask;
-    }
-  }
-
-  int sync_all();
-  int sync_heap(int part); // part varies between 0 and MULTI_CACHE_PARTITIONS
-  int sync_header();
-  int sync_partition(int partition);
-  void sync_partitions(Continuation *cont);
-
-  MultiCacheBase();
-  virtual ~MultiCacheBase() { reset(); }
-  virtual int
-  get_elementsize()
-  {
-    ink_assert(0);
-    return 0;
-  }
-
-  // Heap support
-  UnsunkPtrRegistry unsunk[MULTI_CACHE_PARTITIONS];
-
-  // -1 on error
-  int ptr_to_partition(char *);
-  // the user must pass in the offset field within the
-  // MultiCacheBlock object.  The offset will be inserted
-  // into the object on success and a pointer to the data
-  // returned.  On failure, NULL is returned;
-  void *alloc(int *poffset, int size);
-  void update(int *poffset, int *old_poffset);
-  void *ptr(int *poffset, int partition);
-  int
-  valid_offset(int offset)
-  {
-    int max;
-    if (offset < halfspace_size())
-      max = heap_used[0];
-    else
-      max = halfspace_size() + heap_used[1];
-    return offset < max;
-  }
-  int
-  valid_heap_pointer(char *p)
-  {
-    if (p < heap + halfspace_size())
-      return p < heap + heap_used[0];
-    else
-      return p < heap + halfspace_size() + heap_used[1];
-  }
-  void copy_heap_data(char *src, int s, int *pi, int partition, MultiCacheHeapGC *gc);
-  int
-  halfspace_of(int o)
-  {
-    return o < halfspace_size() ? 0 : 1;
-  }
-  UnsunkPtrRegistry *fixup_heap_offsets(int partition, int before_used, UnsunkPtrRegistry *r = NULL, int base = 0);
-
-  virtual void
-  copy_heap(int partition, MultiCacheHeapGC *gc)
-  {
-    (void)partition;
-    (void)gc;
-  }
-
-  //
-  // Private
-  //
-  void
-  alloc_mutexes()
-  {
-    for (int i = 0; i < MULTI_CACHE_PARTITIONS; i++)
-      locks[i] = new_ProxyMutex();
-  }
-  PtrMutex locks[MULTI_CACHE_PARTITIONS]; // 1 lock per (buckets/partitions)
-};
-
-template <class C> struct MultiCache : public MultiCacheBase {
-  int
-  get_elementsize()
-  {
-    return sizeof(C);
-  }
-
-  MultiCacheBase *
-  dup()
-  {
-    return new MultiCache<C>;
-  }
-
-  void rebuild_element(int buck, char *elem, RebuildMC &r);
-  // -1 is corrupt, 0 == void (do not insert), 1 is OK
-  virtual int
-  rebuild_callout(C *c, RebuildMC &r)
-  {
-    (void)c;
-    (void)r;
-    return 1;
-  }
-
-  virtual void
-  rebuild_insert_callout(C *c, RebuildMC &r)
-  {
-    (void)c;
-    (void)r;
-  }
-
-  //
-  // template operations
-  //
-  int level_of_block(C *b);
-  bool match(uint64_t folded_md5, C *block);
-  C *cache_bucket(uint64_t folded_md5, unsigned int level);
-  C *insert_block(uint64_t folded_md5, C *new_block, unsigned int level);
-  void flush(C *b, int bucket, unsigned int level);
-  void delete_block(C *block);
-  C *lookup_block(uint64_t folded_md5, unsigned int level);
-  void copy_heap(int paritition, MultiCacheHeapGC *);
-};
-
-inline uint64_t
-fold_md5(INK_MD5 const &md5)
-{
-  return md5.fold();
-}
-
-template <class C>
-inline int
-MultiCache<C>::level_of_block(C *b)
-{
-  if ((char *)b - data >= level_offset[1]) {
-    if ((char *)b - data >= level_offset[2])
-      return 2;
-    return 1;
-  }
-  return 0;
-}
-
-template <class C>
-inline C *
-MultiCache<C>::cache_bucket(uint64_t folded_md5, unsigned int level)
-{
-  int bucket   = (int)(folded_md5 % buckets);
-  char *offset = data + level_offset[level] + bucketsize[level] * bucket;
-  return (C *)offset;
-}
-
-//
-// Insert an entry
-//
-template <class C>
-inline C *
-MultiCache<C>::insert_block(uint64_t folded_md5, C *new_block, unsigned int level)
-{
-  C *b     = cache_bucket(folded_md5, level);
-  C *block = NULL, *empty = NULL;
-  int bucket = (int)(folded_md5 % buckets);
-  int hits   = 0;
-
-  // Find the entry
-  //
-  uint64_t tag = make_tag(folded_md5);
-  int n_empty  = 0;
-
-  for (block = b; block < b + elements[level]; block++) {
-    if (block->is_empty() && !empty) {
-      n_empty++;
-      empty = block;
-    }
-    if (tag == block->tag())
-      goto Lfound;
-    hits += block->hits;
-  }
-  if (empty) {
-    block = empty;
-    goto Lfound;
-  }
-
-  {
-    C *best   = NULL;
-    int again = 1;
-    do {
-      // Find an entry previously backed to a higher level.
-      // self scale the hits number within the bucket
-      //
-      unsigned int dec = 0;
-      if (hits > ((max_hits / 2) + 1) * elements[level])
-        dec = 1;
-      for (block = b; block < b + elements[level]; block++) {
-        if (block->backed && (!best || best->hits > block->hits))
-          best = block;
-        if (block->hits)
-          block->hits -= dec;
-      }
-      if (best) {
-        block = best;
-        goto Lfound;
-      }
-      flush(b, bucket, level);
-    } while (again--);
-    ink_assert(!"cache flush failure");
-  }
-
-Lfound:
-  if (new_block) {
-    *block   = *new_block;
-    int *hop = new_block->heap_offset_ptr();
-    if (hop)
-      update(block->heap_offset_ptr(), hop);
-    block->backed = 0;
-  } else
-    block->reset();
-  block->set_full(folded_md5, buckets);
-  ink_assert(block->tag() == tag);
-  return block;
-}
-
-#define REBUILD_FOLDED_MD5(_cl) ((_cl->tag() * (uint64_t)buckets + (uint64_t)bucket))
-
-//
-// This function ejects some number of entries.
-//
-template <class C>
-inline void
-MultiCache<C>::flush(C *b, int bucket, unsigned int level)
-{
-  C *block = NULL;
-  // The comparison against the constant is redundant, but it
-  // quiets the array_bounds error generated by g++ 4.9.2
-  if (level < levels - 1 && level < (MULTI_CACHE_MAX_LEVELS - 1)) {
-    if (level >= lowest_level(bucket))
-      set_lowest_level(bucket, level + 1);
-    for (block = b; block < b + elements[level]; block++) {
-      ink_assert(!block->is_empty());
-      insert_block(REBUILD_FOLDED_MD5(block), block, level + 1);
-      block->backed = true;
-    }
-  } else {
-    for (block = b; block < b + elements[level]; block++)
-      if (!block->is_empty())
-        block->backed = true;
-  }
-}
-
-//
-// Match a cache line and a folded md5 key
-//
-template <class C>
-inline bool
-MultiCache<C>::match(uint64_t folded_md5, C *block)
-{
-  return block->tag() == make_tag(folded_md5);
-}
-
-//
-// This code is a bit of a mess and should probably be rewritten
-//
-template <class C>
-inline void
-MultiCache<C>::delete_block(C *b)
-{
-  if (b->backed) {
-    unsigned int l = level_of_block(b);
-    if (l < levels - 1) {
-      int bucket = (((char *)b - data) - level_offset[l]) / bucketsize[l];
-      C *x       = (C *)(data + level_offset[l + 1] + bucket * bucketsize[l + 1]);
-      for (C *y = x; y < x + elements[l + 1]; y++)
-        if (b->tag() == y->tag())
-          delete_block(y);
-    }
-  }
-  b->set_empty();
-}
-
-//
-// Lookup an entry up to some level in the cache
-//
-template <class C>
-inline C *
-MultiCache<C>::lookup_block(uint64_t folded_md5, unsigned int level)
-{
-  C *b         = cache_bucket(folded_md5, 0);
-  uint64_t tag = make_tag(folded_md5);
-  int i        = 0;
-  // Level 0
-  for (i = 0; i < elements[0]; i++)
-    if (tag == b[i].tag())
-      return &b[i];
-  if (level <= 0)
-    return NULL;
-  // Level 1
-  b = cache_bucket(folded_md5, 1);
-  for (i = 0; i < elements[1]; i++)
-    if (tag == b[i].tag())
-      return &b[i];
-  if (level <= 1)
-    return NULL;
-  // Level 2
-  b = cache_bucket(folded_md5, 2);
-  for (i = 0; i < elements[2]; i++)
-    if (tag == b[i].tag())
-      return &b[i];
-  return NULL;
-}
-
-template <class C>
-inline void
-MultiCache<C>::rebuild_element(int bucket, char *elem, RebuildMC &r)
-{
-  C *e = (C *)elem;
-  if (!e->is_empty()) {
-    r.total++;
-    if (e->is_deleted())
-      r.deleted++;
-    if (e->backed)
-      r.backed++;
-    int res = rebuild_callout(e, r);
-    if (res < 0)
-      r.corrupt++;
-    else if (!res)
-      r.stale++;
-    else {
-      r.good++;
-      if (lookup_block(REBUILD_FOLDED_MD5(e), levels - 1))
-        if (!e->backed)
-          r.duplicates++;
-      C *new_e = insert_block(REBUILD_FOLDED_MD5(e), e, 0);
-      rebuild_insert_callout(new_e, r);
-    }
-  }
-}
-
-template <class C>
-inline void
-MultiCache<C>::copy_heap(int partition, MultiCacheHeapGC *gc)
-{
-  int b = first_bucket_of_partition(partition);
-  int n = buckets_of_partition(partition);
-  for (unsigned int level = 0; level < levels; level++) {
-    int e   = n * elements[level];
-    char *d = data + level_offset[level] + b * bucketsize[level];
-    C *x    = (C *)d;
-    for (int i = 0; i < e; i++) {
-      int s = x[i].heap_size();
-      if (s) {
-        int *pi = x[i].heap_offset_ptr();
-        if (pi) {
-          char *src = (char *)ptr(pi, partition);
-          if (src) {
-            if (heap_halfspace) {
-              if (src >= heap + halfspace_size())
-                continue;
-            } else if (src < heap + halfspace_size())
-              continue;
-            copy_heap_data(src, s, pi, partition, gc);
-          }
-        }
-      }
-    }
-  }
-}
-
-// store either free or in the cache, can be stolen for reconfiguration
-void stealStore(Store &s, int blocks);
-#endif /* _MultiCache_h_ */
diff --git a/iocore/hostdb/P_RefCountCache.h b/iocore/hostdb/P_RefCountCache.h
new file mode 100644
index 0000000..3eba7cb
--- /dev/null
+++ b/iocore/hostdb/P_RefCountCache.h
@@ -0,0 +1,863 @@
+/** @file
+
+  A cache (with map-esque interface) for RefCountObjs
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#ifndef _P_RefCountCache_h_
+#define _P_RefCountCache_h_
+
+#include <I_EventSystem.h>
+#include <P_EventSystem.h> // TODO: less? just need ET_TASK
+
+#include <ts/Map.h>
+#include <ts/PriorityQueue.h>
+
+#include <ts/List.h>
+#include <ts/ink_hrtime.h>
+
+#include <ts/Vec.h>
+#include <ts/I_Version.h>
+#include <unistd.h>
+
+#define REFCOUNT_CACHE_EVENT_SYNC REFCOUNT_CACHE_EVENT_EVENTS_START
+
+#define REFCOUNTCACHE_MAGIC_NUMBER 0x0BAD2D9
+#define REFCOUNTCACHE_MAJOR_VERSION 1
+#define REFCOUNTCACHE_MINOR_VERSION 0
+
+// Stats
+enum RefCountCache_Stats {
+  refcountcache_current_items_stat,        // current number of items
+  refcountcache_current_size_stat,         // current size of cache
+  refcountcache_total_inserts_stat,        // total items inserted
+  refcountcache_total_failed_inserts_stat, // total items unable to insert
+  refcountcache_total_lookups_stat,        // total get() calls
+  refcountcache_total_hits_stat,           // total hits
+
+  // Persistence metrics
+  refcountcache_last_sync_time,   // seconds since epoch of last successful sync
+  refcountcache_last_total_items, // number of items sync last time
+  refcountcache_last_total_size,  // total size at last sync
+
+  RefCountCache_Stat_Count
+};
+
+struct RefCountCacheItemMeta {
+  uint64_t key;
+  unsigned int size;
+  ink_time_t expiry_time; // expire time as seconds since epoch
+  RefCountCacheItemMeta(uint64_t key, unsigned int size, ink_time_t expiry_time = -1)
+    : key(key), size(size), expiry_time(expiry_time)
+  {
+  }
+};
+
+// Layer of indirection for the hashmap-- since it needs lots of things inside of it
+// We'll also use this as the item header, for persisting objects to disk
+class RefCountCacheHashEntry
+{
+public:
+  Ptr<RefCountObj> item;
+  LINK(RefCountCacheHashEntry, item_link);
+  PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry;
+  RefCountCacheItemMeta meta;
+
+  void
+  set(RefCountObj *i, uint64_t key, unsigned int size, int expire_time)
+  {
+    this->item = make_ptr(i);
+    this->meta = RefCountCacheItemMeta(key, size, expire_time);
+  };
+  // Need a no-argument constructor to use the classAllocator
+  RefCountCacheHashEntry() : item(Ptr<RefCountObj>()), expiry_entry(NULL), meta(0, 0) {}
+  // make these values comparable -- so we can sort them
+  bool
+  operator<(const RefCountCacheHashEntry &v2) const
+  {
+    return this->meta.expiry_time < v2.meta.expiry_time;
+  };
+};
+// Since the hashing values are all fixed size, we can simply use a classAllocator to avoid mallocs
+extern ClassAllocator<RefCountCacheHashEntry> refCountCacheHashingValueAllocator;
+extern ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> expiryQueueEntry;
+
+struct RefCountCacheHashing {
+  typedef uint64_t ID;
+  typedef uint64_t const Key;
+  typedef RefCountCacheHashEntry Value;
+  typedef DList(RefCountCacheHashEntry, item_link) ListHead;
+
+  static ID
+  hash(Key key)
+  {
+    return key;
+  }
+  static Key
+  key(Value const *value)
+  {
+    return value->meta.key;
+  }
+  static bool
+  equal(Key lhs, Key rhs)
+  {
+    return lhs == rhs;
+  }
+};
+
+// The RefCountCachePartition is simply a map of key -> Ptr<YourClass>
+// We partition the cache to reduce lock contention
+template <class C> class RefCountCachePartition
+{
+public:
+  RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, RecRawStatBlock *rsb = NULL);
+  Ptr<C> get(uint64_t key);
+  void put(uint64_t key, C *item, int size = 0, int expire_time = 0);
+  void erase(uint64_t key, ink_time_t expiry_time = -1);
+
+  void clear();
+  bool is_full() const;
+  bool make_space_for(unsigned int);
+
+  size_t count() const;
+  void copy(Vec<RefCountCacheHashEntry *> &items);
+
+  typedef typename TSHashTable<RefCountCacheHashing>::iterator iterator_type;
+  typedef typename TSHashTable<RefCountCacheHashing>::self hash_type;
+  typedef typename TSHashTable<RefCountCacheHashing>::Location location_type;
+  TSHashTable<RefCountCacheHashing> *get_map();
+
+  Ptr<ProxyMutex> lock; // Lock
+
+private:
+  void metric_inc(RefCountCache_Stats metric_enum, int64_t data);
+
+  unsigned int part_num;
+  uint64_t max_size;
+  unsigned int max_items;
+  uint64_t size;
+  unsigned int items;
+
+  hash_type item_map;
+
+  PriorityQueue<RefCountCacheHashEntry *> expiry_queue;
+  RecRawStatBlock *rsb;
+};
+
+template <class C>
+RefCountCachePartition<C>::RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items,
+                                                  RecRawStatBlock *rsb)
+  : lock(new_ProxyMutex()), part_num(part_num), max_size(max_size), max_items(max_items), size(0), items(0), rsb(rsb)
+{
+}
+
+template <class C>
+Ptr<C>
+RefCountCachePartition<C>::get(uint64_t key)
+{
+  this->metric_inc(refcountcache_total_lookups_stat, 1);
+  location_type l = this->item_map.find(key);
+  if (l.isValid()) {
+    // found
+    this->metric_inc(refcountcache_total_hits_stat, 1);
+    return make_ptr((C *)l.m_value->item.get());
+  } else {
+    return Ptr<C>();
+  }
+}
+
+template <class C>
+void
+RefCountCachePartition<C>::put(uint64_t key, C *item, int size, int expire_time)
+{
+  this->metric_inc(refcountcache_total_inserts_stat, 1);
+  size += sizeof(C);
+  // Remove any colliding entries
+  this->erase(key);
+
+  // if we are full, and can't make space-- then don't store the item
+  if (this->is_full() && !this->make_space_for(size)) {
+    Debug("refcountcache", "partition %d is full-- not storing item key=%" PRIu64, this->part_num, key);
+    this->metric_inc(refcountcache_total_failed_inserts_stat, 1);
+    return;
+  }
+
+  // Create our value-- which has a ref to the `item`
+  RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc();
+  val->set(item, key, size, expire_time);
+
+  // add expiry_entry to expiry queue, if the expire time is positive (otherwise it means don't expire)
+  if (expire_time >= 0) {
+    Debug("refcountcache", "partition %d adding entry with expire_time=%d\n", this->part_num, expire_time);
+    PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = expiryQueueEntry.alloc();
+    new ((void *)expiry_entry) PriorityQueueEntry<RefCountCacheHashEntry *>(val);
+    expiry_queue.push(expiry_entry);
+    val->expiry_entry = expiry_entry;
+  }
+
+  // add the item to the map
+  this->item_map.insert(val);
+  this->size += val->meta.size;
+  this->items++;
+  this->metric_inc(refcountcache_current_size_stat, (int64_t)val->meta.size);
+  this->metric_inc(refcountcache_current_items_stat, 1);
+}
+
+template <class C>
+void
+RefCountCachePartition<C>::erase(uint64_t key, ink_time_t expiry_time)
+{
+  location_type l = this->item_map.find(key);
+  if (l.isValid()) {
+    if (expiry_time >= 0 && l.m_value->meta.expiry_time != expiry_time) {
+      return;
+    }
+    // TSHashMap does NOT clean up the item-- this remove just removes it from the map
+    // we are responsible for cleaning it up here
+    this->item_map.remove(l);
+
+    // decrement usage counters
+    this->size -= l.m_value->meta.size;
+    this->items--;
+
+    this->metric_inc(refcountcache_current_size_stat, -((int64_t)l.m_value->meta.size));
+    this->metric_inc(refcountcache_current_items_stat, -1);
+
+    // remove from expiry queue
+    if (l.m_value->expiry_entry != NULL) {
+      Debug("refcountcache", "partition %d deleting item from expiry_queue idx=%d\n", this->part_num,
+            l.m_value->expiry_entry->index);
+      this->expiry_queue.erase(l.m_value->expiry_entry);
+      expiryQueueEntry.free(l.m_value->expiry_entry);
+      l.m_value->expiry_entry = NULL; // To avoid the destruction of `l` calling the destructor again-- and causing issues
+    }
+    // Since the Value is actually RefCountObj-- when this gets deleted normally it calls the wrong
+    // `free` method, this forces the delete/decr to happen with the right type
+    Ptr<C> *tmp = (Ptr<C> *)&l.m_value->item;
+    tmp->clear();
+    l.m_value->~RefCountCacheHashEntry();
+    refCountCacheHashingValueAllocator.free(l.m_value);
+  }
+}
+
+template <class C>
+void
+RefCountCachePartition<C>::clear()
+{
+  // this->item_map.clear() doesn't clean up anything, so instead of using that we'll iterate
+  // over every item and then call delete
+  for (RefCountCachePartition<C>::iterator_type i = this->item_map.begin(); i != this->item_map.end(); ++i) {
+    this->erase(i.m_value->meta.key, i.m_value->meta.expiry_time);
+  }
+}
+
+// Are we full?
+template <class C>
+bool
+RefCountCachePartition<C>::is_full() const
+{
+  Debug("refcountcache", "partition %d is full? items %d/%d size %" PRIu64 "/%" PRIu64 "\n\n", this->part_num, this->items,
+        this->max_items, this->size, this->max_size);
+  return (this->max_items > 0 && this->items >= this->max_items) || (this->max_size > 0 && this->size >= this->max_size);
+}
+
+// Attempt to make space for item of `size`
+template <class C>
+bool
+RefCountCachePartition<C>::make_space_for(unsigned int size)
+{
+  ink_time_t now = ink_time();
+  while (this->is_full() || (size > 0 && this->size + size > this->max_size)) {
+    PriorityQueueEntry<RefCountCacheHashEntry *> *top_item = expiry_queue.top();
+    // if there is nothing in the expiry queue, then we can't make space
+    if (top_item == NULL) {
+      return false;
+    }
+
+    // If the first item has expired, lets evict it, and then go around again
+    if (top_item->node->meta.expiry_time < now) {
+      this->erase(top_item->node->meta.key);
+      expiry_queue.pop();
+    } else { // if the first item isn't expired-- the rest won't be either (queue is sorted)
+      return false;
+    }
+  }
+  return true;
+}
+
+template <class C>
+size_t
+RefCountCachePartition<C>::count() const
+{
+  return this->items;
+}
+
+template <class C>
+void
+RefCountCachePartition<C>::copy(Vec<RefCountCacheHashEntry *> &items)
+{
+  for (RefCountCachePartition<C>::iterator_type i = this->item_map.begin(); i != this->item_map.end(); ++i) {
+    RefCountCacheHashEntry *val = refCountCacheHashingValueAllocator.alloc();
+    val->set(i.m_value->item.get(), i.m_value->meta.key, i.m_value->meta.size, i.m_value->meta.expiry_time);
+    items.push_back(val);
+  }
+}
+
+template <class C>
+void
+RefCountCachePartition<C>::metric_inc(RefCountCache_Stats metric_enum, int64_t data)
+{
+  if (this->rsb) {
+    RecIncrGlobalRawStatCount(this->rsb, metric_enum, data);
+  }
+}
+
+template <class C>
+TSHashTable<RefCountCacheHashing> *
+RefCountCachePartition<C>::get_map()
+{
+  return &this->item_map;
+}
+
+// The header for the cache, this is used to check if the serialized cache is compatible
+class RefCountCacheHeader
+{
+public:
+  unsigned int magic;
+  VersionNumber version;
+  VersionNumber object_version; // version passed in of whatever it is we are caching
+
+  RefCountCacheHeader(VersionNumber object_version = VersionNumber());
+  bool operator==(const RefCountCacheHeader other) const;
+  bool compatible(RefCountCacheHeader *other) const;
+};
+
+// This continuation is responsible for persisting RefCountCache to disk
+// To avoid locking the partitions for a long time we'll do the following per-partition:
+//    - lock
+//    - copy ptrs (bump refcount)
+//    - unlock
+//    - persist
+//    - remove ptrs (drop refcount)
+// This way we only have to hold the lock on the partition for the time it takes to get Ptr<>s to all items in the partition
+template <class C> class RefCountCacheSerializer : public Continuation
+{
+public:
+  size_t partition; // Current partition
+  C *cache;         // Pointer to the entire cache
+  Continuation *cont;
+
+  int copy_partition(int event, Event *e);
+  int write_partition(int event, Event *e);
+  int pause_event(int event, Event *e);
+
+  // Create the tmp file on disk we'll be writing to
+  int initialize_storage(int event, Event *e);
+  // do the final mv and close of file handle
+  int finalize_sync();
+
+  // helper method to spin on writes to disk
+  int write_to_disk(const void *, size_t);
+
+  RefCountCacheSerializer(Continuation *acont, C *cc, int frequency, std::string dirname, std::string filename);
+
+private:
+  Vec<RefCountCacheHashEntry *> partition_items;
+
+  int fd; // fd for the file we are writing to
+
+  std::string dirname;
+  std::string filename;
+  std::string tmp_filename;
+
+  ink_hrtime time_per_partition;
+  ink_hrtime start;
+
+  int total_items;
+  int64_t total_size;
+
+  RecRawStatBlock *rsb;
+};
+
+template <class C>
+int
+RefCountCacheSerializer<C>::copy_partition(int event, Event *e)
+{
+  (void)event;
+  if (partition >= cache->partition_count()) {
+    int sync_ret = this->finalize_sync();
+    if (sync_ret != 0) {
+      Warning("Unable to finalize sync of cache to disk %s: %d", this->filename.c_str(), sync_ret);
+    }
+    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
+    Debug("refcountcache", "RefCountCacheSync done");
+    delete this;
+    return EVENT_DONE;
+  }
+  Debug("refcountcache", "sync partition=%ld/%ld", partition, cache->partition_count());
+  // copy the partition into our buffer, then we'll let `pauseEvent` write it out
+  this->partition_items.reserve(cache->get_partition(partition).count());
+  cache->get_partition(partition).copy(this->partition_items);
+  partition++;
+  SET_HANDLER(&RefCountCacheSerializer::write_partition);
+  mutex = e->ethread->mutex;
+  e->schedule_imm(ET_TASK);
+
+  return EVENT_CONT;
+}
+
+template <class C>
+int
+RefCountCacheSerializer<C>::write_partition(int event, Event *e)
+{
+  (void)event; // unused
+  int curr_time = Thread::get_hrtime() / HRTIME_SECOND;
+  // write the partition to disk
+  // for item in this->partitionItems
+  // write to disk with headers per item
+  RefCountCacheHashEntry *it;
+  for (unsigned int i = 0; i < this->partition_items.length(); i++) {
+    it = this->partition_items[i];
+
+    // check if the item has expired, if so don't persist it to disk
+    if (it->meta.expiry_time < curr_time) {
+      continue;
+    }
+
+    // Write the RefCountCacheItemMeta (as our header)
+    int ret = this->write_to_disk((char *)&it->meta, sizeof(it->meta));
+    if (ret < 0) {
+      Warning("Error writing cache item header to %s: %d", this->tmp_filename.c_str(), ret);
+      cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
+      delete this;
+      return EVENT_DONE;
+    }
+    // write the actual object now
+    ret = this->write_to_disk((char *)it->item.get(), it->meta.size);
+    if (ret < 0) {
+      Warning("Error writing cache item to %s: %d", this->tmp_filename.c_str(), ret);
+      cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
+      delete this;
+      return EVENT_DONE;
+    }
+
+    this->total_items++;
+    this->total_size += it->meta.size;
+    refCountCacheHashingValueAllocator.free(it);
+  }
+
+  // Clear partition-- for the next user
+  this->partition_items.clear();
+
+  SET_HANDLER(&RefCountCacheSerializer::pause_event);
+
+  // Figure out how much time we spent
+  ink_hrtime elapsed          = Thread::get_hrtime() - this->start;
+  ink_hrtime expected_elapsed = (this->partition * this->time_per_partition);
+
+  // If we were quicker than our pace-- lets reschedule in the future
+  if (elapsed < expected_elapsed) {
+    e->schedule_in(expected_elapsed - elapsed, ET_TASK);
+  } else { // Otherwise we were too slow-- and need to go now!
+    e->schedule_imm(ET_TASK);
+  }
+  return EVENT_CONT;
+}
+
+template <class C>
+int
+RefCountCacheSerializer<C>::pause_event(int event, Event *e)
+{
+  (void)event;
+  (void)e;
+
+  // Schedule up the next partition
+  if (partition < cache->partition_count())
+    mutex = cache->get_partition(partition).lock.get();
+  else
+    mutex = cont->mutex;
+  SET_HANDLER(&RefCountCacheSerializer::copy_partition);
+  e->schedule_imm(ET_TASK);
+  return EVENT_CONT;
+}
+
+// Open the tmp file, etc.
+template <class C>
+int
+RefCountCacheSerializer<C>::initialize_storage(int event, Event *e)
+{
+  (void)event;                                                                                 // unused
+  this->fd = socketManager.open(this->tmp_filename.c_str(), O_TRUNC | O_RDWR | O_CREAT, 0644); // TODO: configurable perms
+
+  if (this->fd <= 0) {
+    Warning("Unable to create temporary file %s, unable to persist hostdb: %d error:%s\n", this->tmp_filename.c_str(), this->fd,
+            strerror(errno));
+    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
+    delete this;
+    return EVENT_DONE;
+  }
+
+  // Write out the header
+  int ret = this->write_to_disk((char *)&this->cache->get_header(), sizeof(RefCountCacheHeader));
+  if (ret < 0) {
+    Warning("Error writing cache header to %s: %d", this->tmp_filename.c_str(), ret);
+    cont->handleEvent(REFCOUNT_CACHE_EVENT_SYNC, 0);
+    delete this;
+    return EVENT_DONE;
+  }
+
+  SET_HANDLER(&RefCountCacheSerializer::pause_event);
+  e->schedule_imm(ET_TASK);
+  return EVENT_CONT;
+}
+
+// do the final mv and close of file handle
+template <class C>
+int
+RefCountCacheSerializer<C>::finalize_sync()
+{
+  // fsync the fd we have
+  int fsync_ret = socketManager.fsync(this->fd);
+  if (fsync_ret != 0) {
+    socketManager.close(this->fd);
+    return fsync_ret;
+  }
+
+  int dir_fd = socketManager.open(this->dirname.c_str(), O_DIRECTORY); // Correct permissions?
+  if (dir_fd <= 0) {
+    socketManager.close(this->fd);
+    return dir_fd;
+  }
+  // move the file
+  int ret = rename(this->tmp_filename.c_str(), this->filename.c_str());
+
+  if (ret != 0) {
+    socketManager.close(this->fd);
+    socketManager.close(dir_fd);
+    return ret;
+  }
+
+  // fsync the dir
+  int fsync_dir_ret = socketManager.fsync(dir_fd);
+  if (fsync_dir_ret != 0) {
+    socketManager.close(this->fd);
+    socketManager.close(dir_fd);
+    return fsync_dir_ret;
+  }
+
+  int dir_close_ret = socketManager.close(dir_fd);
+  if (dir_close_ret != 0) {
+    socketManager.close(this->fd);
+    return dir_close_ret;
+  }
+
+  int close_ret = socketManager.close(this->fd);
+  if (close_ret != 0) {
+    return close_ret;
+  }
+
+  if (this->rsb) {
+    RecSetRawStatCount(this->rsb, refcountcache_last_sync_time, Thread::get_hrtime() / HRTIME_SECOND);
+    RecSetRawStatCount(this->rsb, refcountcache_last_total_items, this->total_items);
+    RecSetRawStatCount(this->rsb, refcountcache_last_total_size, this->total_size);
+  }
+
+  return 0;
+}
+
+// Write *i to this->fd, if there is an error we'll just stop this continuation
+// TODO: reschedule the continuation if the disk was busy?
+template <class C>
+int
+RefCountCacheSerializer<C>::write_to_disk(const void *ptr, size_t n_bytes)
+{
+  size_t written = 0;
+  while (written < n_bytes) {
+    int ret = socketManager.write(this->fd, (char *)ptr + written, n_bytes - written);
+    if (ret <= 0) {
+      return -1;
+    } else {
+      written += ret;
+    }
+  }
+  return 0;
+}
+
+template <class C>
+RefCountCacheSerializer<C>::RefCountCacheSerializer(Continuation *acont, C *cc, int frequency, std::string dirname,
+                                                    std::string filename)
+  : Continuation(NULL),
+    partition(0),
+    cache(cc),
+    cont(acont),
+    fd(0),
+    dirname(dirname),
+    filename(filename),
+    time_per_partition(HRTIME_SECONDS(frequency) / cc->partition_count()),
+    start(Thread::get_hrtime()),
+    total_items(0),
+    total_size(0),
+    rsb(cc->get_rsb())
+
+{
+  eventProcessor.schedule_imm(this, ET_TASK);
+  this->tmp_filename = this->filename + ".syncing"; // TODO tmp file extension configurable?
+
+  SET_HANDLER(&RefCountCacheSerializer::initialize_storage);
+}
+
+// RefCountCache is a ref-counted key->value map to store classes that inherit from RefCountObj.
+// Once an item is `put` into the cache, the cache will maintain a Ptr<> to that object until erase
+// or clear is called-- which will remove the cache's Ptr<> to the object.
+//
+// This cache may be Persisted (RefCountCacheSync) as well as loaded from disk (LoadRefCountCacheFromPath).
+// This class will optionally emit metrics at the given `metrics_prefix`.
+//
+// Note: although this cache does allow you to set expiry times this cache does not actively GC itself-- meaning
+// it will only remove expired items once the space is required. So to ensure that the cache is bounded either a
+// size or an item limit must be set-- otherwise the cache will not GC.
+//
+// Also note, that if keys collide the previous
+// entry for a given key will be removed, so this "leak" concern is assuming you don't have sufficient space to store
+// an item for each possible key
+template <class C> class RefCountCache
+{
+public:
+  // Constructor
+  RefCountCache(unsigned int num_partitions, int size = -1, int items = -1, VersionNumber object_version = VersionNumber(),
+                std::string metrics_prefix = "");
+  // Destructor
+  ~RefCountCache();
+
+  // User interface to the cache
+  Ptr<C> get(uint64_t key);
+  void put(uint64_t key, C *item, int size = 0, ink_time_t expiry_time = -1);
+  void erase(uint64_t key);
+  void clear();
+
+  // Some methods to get some internal state
+  int partition_for_key(uint64_t key);
+  ProxyMutex *lock_for_key(uint64_t key);
+  size_t partition_count() const;
+  RefCountCachePartition<C> &get_partition(int pnum);
+  size_t count() const;
+  RefCountCacheHeader &get_header();
+  RecRawStatBlock *get_rsb();
+
+private:
+  int max_size;  // Total size
+  int max_items; // Total number of items allowed
+  unsigned int num_partitions;
+  Vec<RefCountCachePartition<C> *> partitions;
+  // Header
+  RefCountCacheHeader header; // Our header
+  RecRawStatBlock *rsb;
+};
+
+template <class C>
+RefCountCache<C>::RefCountCache(unsigned int num_partitions, int size, int items, VersionNumber object_version,
+                                std::string metrics_prefix)
+  : header(RefCountCacheHeader(object_version)), rsb(NULL)
+{
+  this->max_size       = size;
+  this->max_items      = items;
+  this->num_partitions = num_partitions;
+
+  if (metrics_prefix.length() > 0) {
+    this->rsb = RecAllocateRawStatBlock((int)RefCountCache_Stat_Count);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "current_items").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_current_items_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "current_size").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_current_size_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_inserts").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_total_inserts_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_failed_inserts").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_total_failed_inserts_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_lookups").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_total_lookups_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_hits").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_total_hits_stat, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.time").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_last_sync_time, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.total_items").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_last_total_items, RecRawStatSyncCount);
+
+    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.total_size").c_str(), RECD_INT, RECP_NON_PERSISTENT,
+                       (int)refcountcache_last_total_size, RecRawStatSyncCount);
+  }
+  // Now lets create all the partitions
+  this->partitions.reserve(num_partitions);
+  for (unsigned int i = 0; i < num_partitions; i++) {
+    this->partitions.push_back(new RefCountCachePartition<C>(i, size / num_partitions, items / num_partitions, this->rsb));
+  }
+}
+
+// Deconstruct the class
+template <class C> RefCountCache<C>::~RefCountCache()
+{
+  delete this->partitions;
+}
+
+template <class C>
+Ptr<C>
+RefCountCache<C>::get(uint64_t key)
+{
+  return this->partitions[this->partition_for_key(key)]->get(key);
+}
+
+template <class C>
+void
+RefCountCache<C>::put(uint64_t key, C *item, int size, ink_time_t expiry_time)
+{
+  return this->partitions[this->partition_for_key(key)]->put(key, item, size, expiry_time);
+}
+
+// Pick a partition for a given item
+template <class C>
+int
+RefCountCache<C>::partition_for_key(uint64_t key)
+{
+  return key % this->num_partitions;
+}
+
+template <class C>
+RefCountCacheHeader &
+RefCountCache<C>::get_header()
+{
+  return this->header;
+}
+
+template <class C>
+ProxyMutex *
+RefCountCache<C>::lock_for_key(uint64_t key)
+{
+  return this->partitions[this->partition_for_key(key)]->lock.get();
+}
+
+template <class C>
+RefCountCachePartition<C> &
+RefCountCache<C>::get_partition(int pnum)
+{
+  return *this->partitions[pnum];
+}
+
+template <class C>
+size_t
+RefCountCache<C>::count() const
+{
+  size_t c = 0;
+  for (unsigned int i = 0; i < this->num_partitions; i++) {
+    c += this->partitions[i]->count();
+  }
+  return c;
+}
+
+template <class C>
+size_t
+RefCountCache<C>::partition_count() const
+{
+  return this->num_partitions;
+}
+
+template <class C>
+RecRawStatBlock *
+RefCountCache<C>::get_rsb()
+{
+  return this->rsb;
+}
+
+template <class C>
+void
+RefCountCache<C>::erase(uint64_t key)
+{
+  this->partitions[this->partition_for_key(key)]->erase(key);
+}
+
+template <class C>
+void
+RefCountCache<C>::clear()
+{
+  for (unsigned int i = 0; i < this->num_partitions; i++) {
+    this->partitions[i]->clear();
+  }
+}
+
+// Fill `cache` with items in file `filepath` using `load_func` to unmarshall the record.
+// Errors are -1
+template <typename CacheEntryType>
+int
+LoadRefCountCacheFromPath(RefCountCache<CacheEntryType> &cache, std::string dirname, std::string filepath,
+                          CacheEntryType *(*load_func)(char *, unsigned int))
+{
+  // If we have no load method, then we can't load anything so lets just stop right here
+  if (load_func == NULL) {
+    return -1; // TODO: some specific error code
+  }
+
+  int fd = open(filepath.c_str(), O_RDONLY);
+  if (fd <= 0) {
+    return -1; // specific code for missing?
+  }
+
+  // read in the header
+  RefCountCacheHeader tmpHeader = RefCountCacheHeader();
+  int read_ret                  = read(fd, (char *)&tmpHeader, sizeof(RefCountCacheHeader));
+  if (read_ret != sizeof(RefCountCacheHeader)) {
+    socketManager.close(fd);
+    Warning("Error reading cache header from disk (expected %ld): %d", sizeof(RefCountCacheHeader), read_ret);
+    return -1;
+  }
+  if (!cache.get_header().compatible(&tmpHeader)) {
+    socketManager.close(fd);
+    Warning("Incompatible cache at %s, not loading.", filepath.c_str());
+    return -1; // TODO: specific code for incompatible
+  }
+
+  RefCountCacheItemMeta tmpValue = RefCountCacheItemMeta(0, 0);
+  while (true) { // TODO: better loop
+    read_ret = read(fd, (char *)&tmpValue, sizeof(tmpValue));
+    if (read_ret != sizeof(tmpValue)) {
+      break;
+    }
+    char buf[tmpValue.size];
+    read_ret = read(fd, (char *)&buf, tmpValue.size);
+    if (read_ret != (int)tmpValue.size) {
+      Warning("Encountered error reading item from cache: %d", read_ret);
+      break;
+    }
+
+    CacheEntryType *newItem = load_func((char *)&buf, tmpValue.size);
+    if (newItem != NULL) {
+      cache.put(tmpValue.key, newItem, tmpValue.size - sizeof(CacheEntryType));
+    }
+  };
+
+  socketManager.close(fd);
+  return 0;
+}
+
+#endif /* _P_RefCountCache_h_ */
diff --git a/iocore/hostdb/RefCountCache.cc b/iocore/hostdb/RefCountCache.cc
new file mode 100644
index 0000000..1d35ecb
--- /dev/null
+++ b/iocore/hostdb/RefCountCache.cc
@@ -0,0 +1,47 @@
+/** @file
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include <P_RefCountCache.h>
+
+// Since the hashing values are all fixed size, we can simply use a classAllocator to avoid mallocs
+ClassAllocator<RefCountCacheHashEntry> refCountCacheHashingValueAllocator("refCountCacheHashingValueAllocator");
+
+ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> expiryQueueEntry("expiryQueueEntry");
+
+RefCountCacheHeader::RefCountCacheHeader(VersionNumber object_version)
+  : magic(REFCOUNTCACHE_MAGIC_NUMBER), object_version(object_version)
+{
+  this->version.ink_major = REFCOUNTCACHE_MAJOR_VERSION;
+  this->version.ink_minor = REFCOUNTCACHE_MINOR_VERSION;
+};
+
+bool
+RefCountCacheHeader::operator==(const RefCountCacheHeader other) const
+{
+  return this->magic == other.magic && this->version == other.version;
+}
+
+bool
+RefCountCacheHeader::compatible(RefCountCacheHeader *other) const
+{
+  return (this->magic == other->magic && this->version.ink_major == other->version.ink_major &&
+          this->object_version.ink_major == other->version.ink_major);
+};
diff --git a/iocore/hostdb/test_RefCountCache.cc b/iocore/hostdb/test_RefCountCache.cc
new file mode 100644
index 0000000..05e50ae
--- /dev/null
+++ b/iocore/hostdb/test_RefCountCache.cc
@@ -0,0 +1,270 @@
+/** @file
+
+  Unit tests for RefCountCache
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include <iostream>
+#include <RefCountCache.cc>
+#include <I_EventSystem.h>
+#include <ts/I_Layout.h>
+#include <diags.i>
+
+// TODO: add tests with expiry_time
+
+class ExampleStruct : public RefCountObj
+{
+public:
+  int idx;
+  int name_offset; // pointer addr to name
+
+  // Return the char* to the name (TODO: cleaner interface??)
+  char *
+  name()
+  {
+    return (char *)this + this->name_offset;
+  }
+
+  static ExampleStruct *
+  alloc(int size = 0)
+  {
+    return new (malloc(sizeof(ExampleStruct) + size)) ExampleStruct();
+  }
+
+  // To mark it as "deleted" (so its easy to check) we'll just mark the idx as -1
+  void
+  free()
+  {
+    this->idx = -1;
+  }
+
+  static ExampleStruct *
+  unmarshall(char *buf, unsigned int size)
+  {
+    if (size < sizeof(ExampleStruct)) {
+      return NULL;
+    }
+    ExampleStruct *ret = ExampleStruct::alloc(size - sizeof(ExampleStruct));
+    memcpy((void *)ret, buf, size);
+    // Reset the refcount back to 0, this is a bit ugly-- but I'm not sure we want to expose a method
+    // to mess with the refcount, since this is a fairly unique use case
+    ret = new (ret) ExampleStruct();
+    return ret;
+  }
+};
+
+void
+fillCache(RefCountCache<ExampleStruct> *cache, int start, int end)
+{
+  // TODO: name per?
+  std::string name = "foobar";
+  int allocSize    = name.size() + 1;
+
+  for (int i = start; i < end; i++) {
+    ExampleStruct *tmp = ExampleStruct::alloc(allocSize);
+    cache->put((uint64_t)i, tmp);
+
+    tmp->idx         = i;
+    tmp->name_offset = sizeof(ExampleStruct);
+    memcpy(tmp->name(), name.c_str(), name.size());
+    // NULL terminate the string
+    *(tmp->name() + name.size()) = '\0';
+
+    // Print out the struct we put in there
+    // printf("New ExampleStruct%d idx=%d name=%s allocSize=%d\n", i, tmp->idx, name.c_str(), allocSize);
+  }
+  printf("Loading complete! Cache now has %ld items.\n\n", cache->count());
+}
+
+int
+verifyCache(RefCountCache<ExampleStruct> *cache, int start, int end)
+{
+  // Re-query all the structs to make sure they are there and accurate
+  for (int i = start; i < end; i++) {
+    Ptr<ExampleStruct> ccitem = cache->get(i);
+    ExampleStruct *tmp        = ccitem.get();
+    if (tmp == NULL) {
+      // printf("ExampleStruct %d missing, skipping\n", i);
+      continue;
+    }
+    // printf("Get (%p) ExampleStruct%d idx=%d name=%s\n", tmp, i, tmp->idx, tmp->name());
+
+    // Check that idx is correct
+    if (tmp->idx != i) {
+      printf("IDX of ExampleStruct%d incorrect! (%d)\n", i, tmp->idx);
+      return 1; // TODO: spin over all?
+    }
+
+    // check that the name is correct
+    // if (strcmp(tmp->name, name.c_str())){
+    //  printf("Name of ExampleStruct%d incorrect! %s %s\n", i, tmp->name, name.c_str());
+    //  exit(1);
+    //}
+  }
+  return 0;
+}
+
+// TODO: check that the memory was actually free-d better
+int
+testRefcounting()
+{
+  int ret = 0;
+
+  RefCountCache<ExampleStruct> *cache = new RefCountCache<ExampleStruct>(4);
+
+  // Create and then immediately delete an item
+  ExampleStruct *to_delete = ExampleStruct::alloc();
+  ret |= to_delete->refcount() != 0;
+  cache->put(1, to_delete);
+  ret |= to_delete->refcount() != 1;
+  cache->erase(1);
+  ret |= to_delete->refcount() != 0;
+  ret |= to_delete->idx != -1;
+
+  // Set an item in the cache
+  ExampleStruct *tmp = ExampleStruct::alloc();
+  ret |= tmp->refcount() != 0;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+  cache->put((uint64_t)1, tmp);
+  ret |= tmp->refcount() != 1;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+  tmp->idx = 1;
+
+  // Grab a pointer to item 1
+  Ptr<ExampleStruct> ccitem = cache->get((uint64_t)1);
+  ret |= tmp->refcount() != 2;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+
+  Ptr<ExampleStruct> tmpAfter = cache->get((uint64_t)1);
+  ret |= tmp->refcount() != 3;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+
+  // Delete a single item
+  cache->erase(1);
+  ret |= tmp->refcount() != 2;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+  // verify that it still isn't in there
+  ret |= cache->get(1).get() != NULL;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+  ret |= tmpAfter.get()->idx != 1;
+  printf("ret=%d ref=%d\n", ret, tmp->refcount());
+
+  return ret;
+}
+
+int
+testclear()
+{
+  int ret = 0;
+
+  RefCountCache<ExampleStruct> *cache = new RefCountCache<ExampleStruct>(4);
+
+  // Create and then immediately delete an item
+  ExampleStruct *item = ExampleStruct::alloc();
+  ret |= item->refcount() != 0;
+  cache->put(1, item);
+  ret |= item->refcount() != 1;
+  cache->clear();
+  ret |= item->refcount() != 0;
+  ret |= item->idx != -1;
+
+  return ret;
+}
+
+int
+main()
+{
+  // Initialize IOBufAllocator
+  RecModeT mode_type = RECM_STAND_ALONE;
+  Layout::create();
+  init_diags("", NULL);
+  RecProcessInit(mode_type);
+  ink_event_system_init(EVENT_SYSTEM_MODULE_VERSION);
+
+  int ret = 0;
+
+  printf("Starting tests\n");
+
+  printf("Testing refcounts\n");
+  ret |= testRefcounting();
+  printf("refcount ret %d\n", ret);
+
+  // Initialize our cache
+  int cachePartitions                 = 4;
+  RefCountCache<ExampleStruct> *cache = new RefCountCache<ExampleStruct>(cachePartitions);
+  printf("Created...\n");
+
+  LoadRefCountCacheFromPath<ExampleStruct>(*cache, "/tmp", "/tmp/hostdb_cache", ExampleStruct::unmarshall);
+  printf("Cache started...\n");
+  int numTestEntries = 10000;
+
+  // See if anything persisted across the restart
+  ret |= verifyCache(cache, 0, numTestEntries);
+  printf("done verifying startup\n");
+
+  // Clear the cache
+  cache->clear();
+  ret |= cache->count() != 0;
+  printf("clear %d\n", ret);
+
+  // fill it
+  printf("filling...\n");
+  fillCache(cache, 0, numTestEntries);
+  printf("filled...\n");
+
+  // Verify that it has items
+  printf("verifying...\n");
+  ret |= verifyCache(cache, 0, numTestEntries);
+  printf("verified %d\n", ret);
+
+  // Verify that we can alloc() with no extra space
+  printf("Alloc item idx 1\n");
+  ExampleStruct *tmp = ExampleStruct::alloc();
+  cache->put((uint64_t)1, tmp);
+  tmp->idx = 1;
+
+  Ptr<ExampleStruct> tmpAfter = cache->get((uint64_t)1);
+  printf("Item after (ret=%d) %d %d\n", ret, 1, tmpAfter->idx);
+  // Verify every item in the cache
+  ret |= verifyCache(cache, 0, numTestEntries);
+  printf("verified entire cache ret=%d\n", ret);
+
+  // Grab a pointer to item 1
+  Ptr<ExampleStruct> ccitem = cache->get((uint64_t)1);
+  ccitem->idx               = 1;
+  // Delete a single item
+  cache->erase(1);
+  // verify that it still isn't in there
+  ret |= cache->get(1).get() != NULL;
+  ret |= ccitem.get()->idx != 1;
+  printf("ret=%d\n", ret);
+
+  // Verify every item in the cache
+  ret |= verifyCache(cache, 0, numTestEntries);
+
+  // TODO: figure out how to test syncing/loading
+  // write out the whole thing
+  // printf("Sync return: %d\n", cache->sync_all());
+
+  printf("TestRun: %d\n", ret);
+  exit(ret);
+
+  return ret;
+}
diff --git a/lib/ts/I_Version.h b/lib/ts/I_Version.h
index 16d1897..c28a1b1 100644
--- a/lib/ts/I_Version.h
+++ b/lib/ts/I_Version.h
@@ -35,7 +35,7 @@ struct VersionNumber {
   short int ink_major; // incompatible change
   short int ink_minor; // minor change, not incompatible
 
-  VersionNumber() {}
+  VersionNumber() : ink_major(0), ink_minor(0) {}
   VersionNumber(short int major, short int minor) : ink_major(major), ink_minor(minor) {}
 };
 
@@ -45,6 +45,12 @@ operator<(VersionNumber const &lhs, VersionNumber const &rhs)
   return lhs.ink_major < rhs.ink_major || (lhs.ink_major == rhs.ink_major && lhs.ink_minor < rhs.ink_minor);
 }
 
+inline bool
+operator==(VersionNumber const &lhs, VersionNumber const &rhs)
+{
+  return lhs.ink_major == rhs.ink_major && lhs.ink_minor == rhs.ink_minor;
+}
+
 struct Version {
   VersionNumber cacheDB;
   VersionNumber cacheDir;
diff --git a/lib/ts/Map.h b/lib/ts/Map.h
index 8a6d654..fb8b5d0 100644
--- a/lib/ts/Map.h
+++ b/lib/ts/Map.h
@@ -1374,6 +1374,8 @@ public:
 
       This method assumes a @a location is consistent. Be very careful if you modify a @c Location.
 
+      @note This does @b not clean up the removed elements. Use carefully to avoid leaks.
+
       @return @c true if the value was removed, @c false otherwise.
   */
   bool remove(Location const &location);
diff --git a/lib/ts/Ptr.h b/lib/ts/Ptr.h
index a2c4413..ad8f9fb 100644
--- a/lib/ts/Ptr.h
+++ b/lib/ts/Ptr.h
@@ -244,8 +244,8 @@ inline void
 Ptr<T>::clear()
 {
   if (m_ptr) {
-    if (!((RefCountObj *)m_ptr)->refcount_dec())
-      ((RefCountObj *)m_ptr)->free();
+    if (!m_ptr->refcount_dec())
+      m_ptr->free();
     m_ptr = NULL;
   }
 }
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 26ff7ee..7242c45 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -1048,14 +1048,16 @@ static const RecordElement RecordsConfig[] =
   {RECT_CONFIG, "proxy.config.hostdb", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, "[0-1]", RECA_NULL}
   ,
   //       # up to 511 characters, may not be changed while running
-  {RECT_CONFIG, "proxy.config.hostdb.filename", RECD_STRING, "host.db", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  {RECT_CONFIG, "proxy.config.hostdb.filename", RECD_STRING, "host.db", RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
   //       # in entries, may not be changed while running
-  {RECT_CONFIG, "proxy.config.hostdb.size", RECD_INT, "120000", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  {RECT_CONFIG, "proxy.config.hostdb.max_count", RECD_INT, "-1", RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
-  {RECT_CONFIG, "proxy.config.hostdb.storage_path", RECD_STRING, TS_BUILD_CACHEDIR, RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  {RECT_CONFIG, "proxy.config.hostdb.storage_path", RECD_STRING, TS_BUILD_CACHEDIR, RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
-  {RECT_CONFIG, "proxy.config.hostdb.storage_size", RECD_INT, "45375488", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  {RECT_CONFIG, "proxy.config.hostdb.max_size", RECD_INT, "10M", RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  ,
+  {RECT_CONFIG, "proxy.config.hostdb.partitions", RECD_INT, "64", RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
   //       # in minutes (all three)
   //       #  0 = obey, 1 = ignore, 2 = min(X,ttl), 3 = max(X,ttl)
@@ -1089,7 +1091,7 @@ static const RecordElement RecordsConfig[] =
   {RECT_CONFIG, "proxy.config.hostdb.timed_round_robin", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
   //       # how often should the hostdb be synced (seconds)
-  {RECT_CONFIG, "proxy.config.cache.hostdb.sync_frequency", RECD_INT, "120", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+  {RECT_CONFIG, "proxy.config.cache.hostdb.sync_frequency", RECD_INT, "120", RECU_RESTART_TS, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
   {RECT_CONFIG, "proxy.config.hostdb.host_file.path", RECD_STRING, NULL, RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
   ,
diff --git a/proxy/EventName.cc b/proxy/EventName.cc
index 784ff48..c79232f 100644
--- a/proxy/EventName.cc
+++ b/proxy/EventName.cc
@@ -32,7 +32,7 @@
 //#include "P_Cluster.h"
 #include "I_HostDB.h"
 #include "BaseManager.h"
-#include "P_MultiCache.h"
+#include "P_RefCountCache.h"
 
 /*-------------------------------------------------------------------------
   event_int_to_string
@@ -95,8 +95,8 @@ event_int_to_string(int event, int blen, char *buffer)
   case DNS_EVENT_EVENTS_START:
     return "DNS_EVENT_EVENTS_START";
 
-  case MULTI_CACHE_EVENT_SYNC:
-    return "MULTI_CACHE_EVENT_SYNC";
+  case REFCOUNT_CACHE_EVENT_SYNC:
+    return "REFCOUNT_CACHE_EVENT_SYNC";
 
   case CACHE_EVENT_LOOKUP:
     return "CACHE_EVENT_LOOKUP";
diff --git a/proxy/Main.cc b/proxy/Main.cc
index 5ce9da2..6506717 100644
--- a/proxy/Main.cc
+++ b/proxy/Main.cc
@@ -695,7 +695,7 @@ cmd_clear(char *cmd)
       Note("unable to open Host Database, CLEAR failed");
       return CMD_FAILED;
     }
-    hostDBProcessor.cache()->reset();
+    hostDBProcessor.cache()->refcountcache->clear();
     if (c_hdb)
       return CMD_OK;
   }
@@ -1918,7 +1918,6 @@ main(int /* argc ATS_UNUSED */, const char **argv)
 
 #if TS_HAS_TESTS
     TransformTest::run();
-    run_HostDBTest();
     //  run_SimpleHttp();
     run_RegressionTest();
 #endif
diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc
index 694c60c..3cfb3ca 100644
--- a/proxy/http/HttpSM.cc
+++ b/proxy/http/HttpSM.cc
@@ -2061,6 +2061,7 @@ void
 HttpSM::process_srv_info(HostDBInfo *r)
 {
   DebugSM("dns_srv", "beginning process_srv_info");
+  t_state.hostdb_entry = Ptr<HostDBInfo>(r);
 
   /* we didn't get any SRV records, continue w normal lookup */
   if (!r || !r->is_srv || !r->round_robin) {
@@ -2081,7 +2082,6 @@ HttpSM::process_srv_info(HostDBInfo *r)
       t_state.srv_lookup                  = false;
       DebugSM("dns_srv", "SRV records empty for %s", t_state.dns_info.lookup_name);
     } else {
-      ink_assert(r->md5_high == srv->md5_high && r->md5_low == srv->md5_low && r->md5_low_low == srv->md5_low_low);
       t_state.dns_info.srv_lookup_success = true;
       t_state.dns_info.srv_port           = srv->data.srv.srv_port;
       t_state.dns_info.srv_app            = srv->app;
@@ -2096,6 +2096,9 @@ HttpSM::process_srv_info(HostDBInfo *r)
 void
 HttpSM::process_hostdb_info(HostDBInfo *r)
 {
+  // Increment the refcount to our item, since we are pointing at it
+  t_state.hostdb_entry = Ptr<HostDBInfo>(r);
+
   sockaddr const *client_addr = NULL;
   bool use_client_addr        = t_state.http_config_param->use_client_target_addr == 1 && t_state.client_info.is_transparent &&
                          t_state.dns_info.os_addr_style == HttpTransact::DNSLookupInfo::OS_ADDR_TRY_DEFAULT;
@@ -2110,7 +2113,7 @@ HttpSM::process_hostdb_info(HostDBInfo *r)
     // Leave ret unassigned, so we don't overwrite the host_db_info
   }
 
-  if (r && !r->failed()) {
+  if (r && !r->is_failed()) {
     ink_time_t now                    = ink_cluster_time();
     HostDBInfo *ret                   = NULL;
     t_state.dns_info.lookup_success   = true;
@@ -2131,8 +2134,8 @@ HttpSM::process_hostdb_info(HostDBInfo *r)
         if (t_state.dns_info.srv_lookup_success) {
           uint32_t last_failure = 0xFFFFFFFF;
           for (int i = 0; i < rr->rrcount && last_failure != 0; ++i) {
-            if (last_failure > rr->info[i].app.http_data.last_failure)
-              last_failure = rr->info[i].app.http_data.last_failure;
+            if (last_failure > rr->info(i).app.http_data.last_failure)
+              last_failure = rr->info(i).app.http_data.last_failure;
           }
 
           if (last_failure != 0 && (uint32_t)(now - t_state.txn_conf->down_server_timeout) < last_failure) {
diff --git a/proxy/http/HttpTransact.h b/proxy/http/HttpTransact.h
index 858426b..b9dd5de 100644
--- a/proxy/http/HttpTransact.h
+++ b/proxy/http/HttpTransact.h
@@ -905,8 +905,9 @@ public:
     int next_hop_scheme; // out
     int orig_scheme;     // pre-mapped scheme
     int method;
-    int cause_of_death_errno; // in
-    HostDBInfo host_db_info;  // in
+    int cause_of_death_errno;     // in
+    Ptr<HostDBInfo> hostdb_entry; // Pointer to the entry we are referencing in hostdb-- to keep our ref
+    HostDBInfo host_db_info;      // in
 
     ink_time_t client_request_time;    // internal
     ink_time_t request_sent_time;      // internal
@@ -1120,7 +1121,7 @@ public:
       via_string[MAX_VIA_INDICES]              = '\0';
 
       memset(user_args, 0, sizeof(user_args));
-      memset(&host_db_info, 0, sizeof(host_db_info));
+      memset((void *)&host_db_info, 0, sizeof(host_db_info));
     }
 
     void

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].