You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by pa...@apache.org on 2018/04/04 20:54:06 UTC
[trafficserver] branch master updated: Refactor: Move stripe and
span structure to CacheDefs
This is an automated email from the ASF dual-hosted git repository.
paziz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 5be7b7f Refactor: Move stripe and span structure to CacheDefs
5be7b7f is described below
commit 5be7b7f449372f711139d9ba88beab9036b6ea9b
Author: Persia Aziz <pe...@yahoo-inc.com>
AuthorDate: Wed Apr 4 15:10:27 2018 -0500
Refactor: Move stripe and span structure to CacheDefs
---
cmd/traffic_cache_tool/CacheDefs.cc | 916 ++++++++++++++++++++++++++++++
cmd/traffic_cache_tool/CacheDefs.h | 164 ++++++
cmd/traffic_cache_tool/CacheTool.cc | 1067 +----------------------------------
3 files changed, 1086 insertions(+), 1061 deletions(-)
diff --git a/cmd/traffic_cache_tool/CacheDefs.cc b/cmd/traffic_cache_tool/CacheDefs.cc
index 579600f..38eebea 100644
--- a/cmd/traffic_cache_tool/CacheDefs.cc
+++ b/cmd/traffic_cache_tool/CacheDefs.cc
@@ -162,4 +162,920 @@ Doc::data()
{
return this->hdr() + hlen;
}
+} // end namespace ts
+
+int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
+CacheStoreBlocks Vol_hash_alloc_size(1024);
+// Default this to read only, only enable write if specifically required.
+int OPEN_RW_FLAG = O_RDONLY;
+
+namespace ct
+{
+bool
+Stripe::validate_sync_serial()
+{
+ // check if A sync_serials match and A is at least as updated as B
+ return (_meta[0][0].sync_serial == _meta[0][1].sync_serial &&
+ (_meta[0][0].sync_serial >= _meta[1][0].sync_serial ||
+ _meta[1][0].sync_serial != _meta[1][1].sync_serial)) || // OR check if B's sync_serials match
+ (_meta[1][0].sync_serial == _meta[1][1].sync_serial);
+}
+
+Errata
+Stripe::clear()
+{
+ Errata zret;
+ alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static.
+ for (auto i : {A, B}) {
+ for (auto j : {HEAD, FOOT}) {
+ ssize_t n = pwrite(_span->_fd, zero, CacheStoreBlocks::SCALE, this->_meta_pos[i][j]);
+ if (n < CacheStoreBlocks::SCALE)
+ std::cout << "Failed to clear stripe header" << std::endl;
+ }
+ }
+
+ return zret;
+}
+Stripe::Chunk::~Chunk()
+{
+ this->clear();
+}
+void
+Stripe::Chunk::append(MemSpan m)
+{
+ _chain.push_back(m);
+}
+void
+Stripe::Chunk::clear()
+{
+ for (auto &m : _chain)
+ free(const_cast<void *>(m.data()));
+ _chain.clear();
+}
+
+Stripe::Stripe(Span *span, Bytes start, CacheStoreBlocks len) : _span(span), _start(start), _len(len)
+{
+ ts::bwprint(hashText, "{} {}:{}", span->_path.path(), _start.count(), _len.count());
+ printf("hash id of stripe is hash of %.*s\n", static_cast<int>(hashText.size()), hashText.data());
+}
+
+bool
+Stripe::isFree() const
+{
+ return 0 == _vol_idx;
+}
+
+// TODO: Implement the whole logic
+Errata
+Stripe::InitializeMeta()
+{
+ Errata zret;
+ // memset(this->raw_dir, 0, dir_len);
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ _meta[i][j].magic = StripeMeta::MAGIC;
+ _meta[i][j].version.ink_major = ts::CACHE_DB_MAJOR_VERSION;
+ _meta[i][j].version.ink_minor = ts::CACHE_DB_MINOR_VERSION;
+ _meta[i][j].agg_pos = _meta[i][j].last_write_pos = _meta[i][j].write_pos = this->_content;
+ _meta[i][j].phase = _meta[i][j].cycle = _meta[i][j].sync_serial = _meta[i][j].write_serial = _meta[i][j].dirty = 0;
+ _meta[i][j].create_time = time(nullptr);
+ _meta[i][j].sector_size = DEFAULT_HW_SECTOR_SIZE;
+ }
+ }
+ if (!freelist) // freelist is not allocated yet
+ {
+ freelist = (uint16_t *)malloc(_segments * sizeof(uint16_t)); // segments has already been calculated
+ }
+ if (!dir) // for new spans, this will likely be nullptr as we don't need to read the stripe meta from disk
+ {
+ char *raw_dir = (char *)ats_memalign(ats_pagesize(), this->vol_dirlen());
+ dir = (CacheDirEntry *)(raw_dir + this->vol_headerlen());
+ }
+ init_dir();
+ return zret;
+}
+
+// Need to be bit more robust at some point.
+bool
+Stripe::validateMeta(StripeMeta const *meta)
+{
+ // Need to be bit more robust at some point.
+ return StripeMeta::MAGIC == meta->magic && meta->version.ink_major <= ts::CACHE_DB_MAJOR_VERSION &&
+ meta->version.ink_minor <= 2 // This may have always been zero, actually.
+ ;
+}
+
+bool
+Stripe::probeMeta(MemSpan &mem, StripeMeta const *base_meta)
+{
+ while (mem.usize() >= sizeof(StripeMeta)) {
+ StripeMeta const *meta = mem.ptr<StripeMeta>(0);
+ if (this->validateMeta(meta) && (base_meta == nullptr || // no base version to check against.
+ (meta->version == base_meta->version) // need more checks here I think.
+ )) {
+ return true;
+ }
+ // The meta data is stored aligned on a stripe block boundary, so only need to check there.
+ mem += CacheStoreBlocks::SCALE;
+ }
+ return false;
+}
+
+/* INK_ALIGN() is only to be used to align on a power of 2 boundary */
+#define INK_ALIGN(size, boundary) (((size) + ((boundary)-1)) & ~((boundary)-1))
+
+#define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), 8192)
+
+Errata
+Stripe::updateHeaderFooter()
+{
+ Errata zret;
+ this->vol_init_data();
+
+ int64_t hdr_size = this->vol_headerlen();
+ int64_t dir_size = this->vol_dirlen();
+ Bytes footer_offset = Bytes(dir_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta)));
+ _meta_pos[A][HEAD] = round_down(_start);
+ _meta_pos[A][FOOT] = round_down(_start + footer_offset);
+ _meta_pos[B][HEAD] = round_down(this->_start + Bytes(dir_size));
+ _meta_pos[B][FOOT] = round_down(this->_start + Bytes(dir_size) + footer_offset);
+ std::cout << "updating header " << _meta_pos[0][0] << std::endl;
+ std::cout << "updating header " << _meta_pos[0][1] << std::endl;
+ std::cout << "updating header " << _meta_pos[1][0] << std::endl;
+ std::cout << "updating header " << _meta_pos[1][1] << std::endl;
+ InitializeMeta();
+
+ if (!OPEN_RW_FLAG) {
+ zret.push(0, 1, "Writing Not Enabled.. Please use --write to enable writing to disk");
+ return zret;
+ }
+
+ char *meta_t = (char *)ats_memalign(ats_pagesize(), dir_size);
+ // copy headers
+ for (auto i : {A, B}) {
+ // copy header
+ memcpy(meta_t, &_meta[i][HEAD], sizeof(StripeMeta));
+ // copy freelist
+ memcpy(meta_t + sizeof(StripeMeta) - sizeof(uint16_t), this->freelist, this->_segments * sizeof(uint16_t));
+
+ ssize_t n = pwrite(_span->_fd, meta_t, hdr_size, _meta_pos[i][HEAD]);
+ if (n < hdr_size) {
+ std::cout << "problem writing header to disk: " << strerror(errno) << ":"
+ << " " << n << "<" << hdr_size << std::endl;
+ zret = Errata::Message(0, errno, "Failed to write stripe header ");
+ return zret;
+ }
+ // copy dir entries
+ dir_size = dir_size - hdr_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
+ memcpy(meta_t, (char *)dir, dir_size);
+ n = pwrite(_span->_fd, meta_t, dir_size, _meta_pos[i][HEAD] + hdr_size); //
+ if (n < dir_size) {
+ std::cout << "problem writing dir to disk: " << strerror(errno) << ":"
+ << " " << n << "<" << dir_size << std::endl;
+ zret = Errata::Message(0, errno, "Failed to write stripe header ");
+ return zret;
+ }
+
+ // copy footer,
+ memcpy(meta_t, &_meta[i][FOOT], sizeof(StripeMeta));
+
+ int64_t footer_size = ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
+ n = pwrite(_span->_fd, meta_t, footer_size, _meta_pos[i][FOOT]);
+ if (n < footer_size) {
+ std::cout << "problem writing footer to disk: " << strerror(errno) << ":"
+ << " " << n << "<" << footer_size << std::endl;
+ zret = Errata::Message(0, errno, "Failed to write stripe header ");
+ return zret;
+ }
+ }
+ ats_memalign_free(meta_t);
+ return zret;
+}
+
+TS_INLINE int
+Stripe::vol_headerlen()
+{
+ return ROUND_TO_STORE_BLOCK(sizeof(StripeMeta) + sizeof(uint16_t) * (this->_segments - 1));
+}
+
+size_t
+Stripe::vol_dirlen()
+{
+ return vol_headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->_buckets) * DIR_DEPTH * this->_segments * SIZEOF_DIR) +
+ ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
+}
+
+void
+Stripe::vol_init_data_internal()
+{
+ this->_buckets =
+ ((this->_len.count() * 8192 - (this->_content - this->_start)) / cache_config_min_average_object_size) / DIR_DEPTH;
+ this->_segments = (this->_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
+ this->_buckets = (this->_buckets + this->_segments - 1) / this->_segments;
+ this->_content = this->_start + Bytes(2 * vol_dirlen());
+}
+
+void
+Stripe::vol_init_data()
+{
+ // iteratively calculate start + buckets
+ this->vol_init_data_internal();
+ this->vol_init_data_internal();
+ this->vol_init_data_internal();
+}
+
+void
+Stripe::updateLiveData(enum Copy c)
+{
+ // CacheStoreBlocks delta{_meta_pos[c][FOOT] - _meta_pos[c][HEAD]};
+ CacheStoreBlocks header_len(0);
+ // int64_t n_buckets;
+ // int64_t n_segments;
+
+ _content = _start;
+ /*
+ * COMMENTING THIS SECTION FOR NOW TO USE THE EXACT LOGIN USED IN ATS TO CALCULATE THE NUMBER OF SEGMENTS AND BUCKETS
+ // Past the header is the segment free list heads which if sufficiently long (> ~4K) can take
+ // more than 1 store block. Start with a guess of 1 and adjust upwards as needed. A 2TB stripe
+ // with an AOS of 8000 has roughly 3700 segments meaning that for even 10TB drives this loop
+ // should only be a few iterations.
+ do {
+ ++header_len;
+ n_buckets = Bytes(delta - header_len) / (sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET);
+ n_segments = n_buckets / ts::MAX_BUCKETS_PER_SEGMENT;
+ // This should never be more than one loop, usually none.
+ while ((n_buckets / n_segments) > ts::MAX_BUCKETS_PER_SEGMENT)
+ ++n_segments;
+ } while ((sizeof(StripeMeta) + sizeof(uint16_t) * n_segments) > static_cast<size_t>(header_len));
+
+ _buckets = n_buckets / n_segments;
+ _segments = n_segments;
+ */
+ _directory._skip = header_len;
+}
+
+#define dir_big(_e) ((uint32_t)((((_e)->w[1]) >> 8) & 0x3))
+#define dir_bit(_e, _w, _b) ((uint32_t)(((_e)->w[_w] >> (_b)) & 1))
+#define dir_size(_e) ((uint32_t)(((_e)->w[1]) >> 10))
+#define dir_approx_size(_e) ((dir_size(_e) + 1) * DIR_BLOCK_SIZE(dir_big(_e)))
+#define dir_head(_e) dir_bit(_e, 2, 13)
+#define DIR_MASK_TAG(_t) ((_t) & ((1 << DIR_TAG_WIDTH) - 1))
+#define dir_tag(_e) ((uint32_t)((_e)->w[2] & ((1 << DIR_TAG_WIDTH) - 1)))
+#define dir_offset(_e) \
+ ((int64_t)(((uint64_t)(_e)->w[0]) | (((uint64_t)((_e)->w[1] & 0xFF)) << 16) | (((uint64_t)(_e)->w[4]) << 24)))
+#define dir_set_offset(_e, _o) \
+ do { \
+ (_e)->w[0] = (uint16_t)_o; \
+ (_e)->w[1] = (uint16_t)((((_o) >> 16) & 0xFF) | ((_e)->w[1] & 0xFF00)); \
+ (_e)->w[4] = (uint16_t)((_o) >> 24); \
+ } while (0)
+//#define dir_segment(_s, _d) vol_dir_segment(_d, _s)
+#define dir_in_seg(_s, _i) ((CacheDirEntry *)(((char *)(_s)) + (SIZEOF_DIR * (_i))))
+#define dir_next(_e) (_e)->w[3]
+#define dir_phase(_e) dir_bit(_e, 2, 12)
+#define DIR_BLOCK_SHIFT(_i) (3 * (_i))
+#define DIR_BLOCK_SIZE(_i) (CACHE_BLOCK_SIZE << DIR_BLOCK_SHIFT(_i))
+#define dir_set_prev(_e, _o) (_e)->w[2] = (uint16_t)(_o)
+#define dir_set_next(_e, _o) (_e)->w[3] = (uint16_t)(_o)
+
+bool
+dir_compare_tag(const CacheDirEntry *e, const INK_MD5 *key)
+{
+ return (dir_tag(e) == DIR_MASK_TAG(key->slice32(2)));
+}
+
+TS_INLINE CacheDirEntry *
+Stripe::dir_segment(int s)
+{
+ return vol_dir_segment(s);
+}
+
+TS_INLINE CacheDirEntry *
+Stripe::vol_dir_segment(int s)
+{
+ return (CacheDirEntry *)(((char *)this->dir) + (s * this->_buckets) * DIR_DEPTH * SIZEOF_DIR);
+}
+
+TS_INLINE CacheDirEntry *
+dir_bucket(int64_t b, CacheDirEntry *seg)
+{
+ return dir_in_seg(seg, b * DIR_DEPTH);
+}
+
+TS_INLINE CacheDirEntry *
+dir_from_offset(int64_t i, CacheDirEntry *seg)
+{
+#if DIR_DEPTH < 5
+ if (!i)
+ return 0;
+ return dir_in_seg(seg, i);
+#else
+ i = i + ((i - 1) / (DIR_DEPTH - 1));
+ return dir_in_seg(seg, i);
+#endif
+}
+
+TS_INLINE int
+vol_in_phase_valid(Stripe *d, CacheDirEntry *e)
+{
+ return (dir_offset(e) - 1 < ((d->_meta[0][0].write_pos + d->agg_buf_pos - d->_start) / CACHE_BLOCK_SIZE));
+}
+
+TS_INLINE int
+vol_out_of_phase_valid(Stripe *d, CacheDirEntry *e)
+{
+ return (dir_offset(e) - 1 >= ((d->_meta[0][0].agg_pos - d->_start) / CACHE_BLOCK_SIZE));
+}
+
+TS_INLINE CacheDirEntry *
+next_dir(CacheDirEntry *d, CacheDirEntry *seg)
+{
+ int i = dir_next(d);
+ return dir_from_offset(i, seg);
+}
+#define dir_offset(_e) \
+ ((int64_t)(((uint64_t)(_e)->w[0]) | (((uint64_t)((_e)->w[1] & 0xFF)) << 16) | (((uint64_t)(_e)->w[4]) << 24)))
+
+TS_INLINE CacheDirEntry *
+dir_bucket_row(CacheDirEntry *b, int64_t i)
+{
+ return dir_in_seg(b, i);
+}
+
+TS_INLINE int64_t
+dir_to_offset(const CacheDirEntry *d, const CacheDirEntry *seg)
+{
+#if DIR_DEPTH < 5
+ return (((char *)d) - ((char *)seg)) / SIZEOF_DIR;
+#else
+ int64_t i = (int64_t)((((char *)d) - ((char *)seg)) / SIZEOF_DIR);
+ i = i - (i / DIR_DEPTH);
+ return i;
+#endif
+}
+
+bool
+Stripe::dir_valid(CacheDirEntry *_e)
+{
+ return (this->_meta[0][0].phase == dir_phase(_e) ? vol_in_phase_valid(this, _e) : vol_out_of_phase_valid(this, _e));
+}
+
+int64_t
+Stripe::stripe_offset(CacheDirEntry *e)
+{
+ return this->_content + (int64_t)(dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE;
+}
+
+int
+Stripe::dir_probe(INK_MD5 *key, CacheDirEntry *result, CacheDirEntry **last_collision)
+{
+ int segment = key->slice32(0) % this->_segments;
+ int bucket = key->slice32(1) % this->_buckets;
+
+ CacheDirEntry *seg = this->dir_segment(segment);
+ CacheDirEntry *e = nullptr;
+ e = dir_bucket(bucket, seg);
+ char *stripe_buff2 = (char *)malloc(dir_approx_size(e));
+ Doc *doc = nullptr;
+ // TODO: collision craft is pending.. look at the main ATS code. Assuming no collision for now
+ if (dir_offset(e)) {
+ do {
+ if (dir_compare_tag(e, key)) {
+ if (dir_valid(e)) {
+ std::cout << "dir_probe hit: found seg: " << segment << " bucket: " << bucket << " offset: " << dir_offset(e)
+ << std::endl;
+ break;
+ } else {
+ // let's skip deleting for now
+ // e = dir_delete_entry(e, p ,segment);
+ // continue;
+ }
+ }
+ e = next_dir(e, seg);
+
+ } while (e);
+ int fd = _span->_fd;
+ int64_t offset = stripe_offset(e);
+ int64_t size = dir_approx_size(e);
+ ssize_t n = pread(fd, stripe_buff2, size, offset);
+ if (n < size)
+ std::cout << "Failed to read content from the Stripe" << std::endl;
+
+ doc = reinterpret_cast<Doc *>(stripe_buff2);
+ std::string hdr(doc->hdr(), doc->hlen);
+ std::cout << "HEADER\n" << hdr << std::endl;
+
+ std::string data_(doc->data(), doc->data_len());
+ std::cout << "DATA\n" << data_ << std::endl;
+ } else {
+ std::cout << "Not found in the Cache" << std::endl;
+ }
+ free(stripe_buff2);
+ return 0; // Why does this have a non-void return?
+}
+
+CacheDirEntry *
+Stripe::dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s)
+{
+ CacheDirEntry *seg = dir_segment(s);
+ int no = dir_next(e);
+ this->_meta[0][0].dirty = 1;
+ if (p) {
+ unsigned int fo = this->freelist[s];
+ unsigned int eo = dir_to_offset(e, seg);
+ dir_clear(e);
+ dir_set_next(p, no);
+ dir_set_next(e, fo);
+ if (fo) {
+ dir_set_prev(dir_from_offset(fo, seg), eo);
+ }
+ this->freelist[s] = eo;
+ } else {
+ CacheDirEntry *n = next_dir(e, seg);
+ if (n) {
+ dir_assign(e, n);
+ dir_delete_entry(n, e, s);
+ return e;
+ } else {
+ dir_clear(e);
+ return nullptr;
+ }
+ }
+ return dir_from_offset(no, seg);
+}
+
+void
+Stripe::walk_all_buckets()
+{
+ for (int s = 0; s < this->_segments; s++) {
+ if (walk_bucket_chain(s))
+ std::cout << "Loop present in Segment " << s << std::endl;
+ }
+}
+
+bool
+Stripe::walk_bucket_chain(int s)
+{
+ CacheDirEntry *seg = this->dir_segment(s);
+ std::bitset<65536> b_bitset;
+ b_bitset.reset();
+ for (int b = 0; b < this->_buckets; b++) {
+ CacheDirEntry *p = nullptr;
+ auto *dir_b = dir_bucket(b, seg);
+ CacheDirEntry *e = dir_b;
+ int len = 0;
+
+ while (e) {
+ len++;
+ int i = dir_to_offset(e, seg);
+ if (b_bitset.test(i)) {
+ std::cout << "bit already set in "
+ << "seg " << s << " bucket " << b << std::endl;
+ }
+ if (i > 0) // i.e., not the first dir in the segment
+ b_bitset[i] = 1;
+
+#if 1
+ if (!dir_valid(e) || !dir_offset(e)) {
+ // std::cout<<"dir_clean in segment "<<s<<" =>cleaning "<<e<<" tag"<<dir_tag(e)<<" boffset"<< dir_offset(e)<< " bucket:
+ // "<<dir_b<< " bucket len: "<<dir_bucket_length(dir_b, s)<<std::endl;
+ e = dir_delete_entry(e, p, s);
+ continue;
+ }
+#endif
+ p = e;
+ e = next_dir(e, seg);
+ }
+ // std::cout<<"dir len in this bucket "<<len<<std::endl;
+ }
+ return false;
+}
+
+void
+Stripe::dir_free_entry(CacheDirEntry *e, int s)
+{
+ CacheDirEntry *seg = this->dir_segment(s);
+ unsigned int fo = this->freelist[s];
+ unsigned int eo = dir_to_offset(e, seg);
+ dir_set_next(e, fo);
+ if (fo) {
+ dir_set_prev(dir_from_offset(fo, seg), eo);
+ }
+ this->freelist[s] = eo;
+}
+
+// adds all the directory entries
+// in a segment to the segment freelist
+void
+Stripe::dir_init_segment(int s)
+{
+ this->freelist[s] = 0;
+ CacheDirEntry *seg = this->dir_segment(s);
+ int l, b;
+ memset(seg, 0, SIZEOF_DIR * DIR_DEPTH * this->_buckets);
+ for (l = 1; l < DIR_DEPTH; l++) {
+ for (b = 0; b < this->_buckets; b++) {
+ CacheDirEntry *bucket = dir_bucket(b, seg);
+ this->dir_free_entry(dir_bucket_row(bucket, l), s);
+ }
+ }
+}
+
+void
+Stripe::init_dir()
+{
+ for (int s = 0; s < this->_segments; s++) {
+ this->freelist[s] = 0;
+ CacheDirEntry *seg = this->dir_segment(s);
+ int l, b;
+ for (l = 1; l < DIR_DEPTH; l++) {
+ for (b = 0; b < this->_buckets; b++) {
+ CacheDirEntry *bucket = dir_bucket(b, seg);
+ this->dir_free_entry(dir_bucket_row(bucket, l), s);
+ // std::cout<<"freelist"<<this->freelist[s]<<std::endl;
+ }
+ }
+ }
+}
+
+Errata
+Stripe::loadDir()
+{
+ Errata zret;
+ int64_t dirlen = this->vol_dirlen();
+ char *raw_dir = (char *)ats_memalign(ats_pagesize(), dirlen);
+ dir = (CacheDirEntry *)(raw_dir + this->vol_headerlen());
+ // read directory
+ ssize_t n = pread(this->_span->_fd, raw_dir, dirlen, this->_start);
+ if (n < dirlen)
+ std::cout << "Failed to read Dir from stripe @" << this->hashText;
+ return zret;
}
+//
+// Cache Directory
+//
+
+#if 0
+// return value 1 means no loop
+// zero indicates loop
+int
+dir_bucket_loop_check(CacheDirEntry *start_dir, CacheDirEntry *seg)
+{
+ if (start_dir == nullptr) {
+ return 1;
+ }
+
+ CacheDirEntry *p1 = start_dir;
+ CacheDirEntry *p2 = start_dir;
+
+ while (p2) {
+ // p1 moves by one entry per iteration
+ assert(p1);
+ p1 = next_dir(p1, seg);
+ // p2 moves by two entries per iteration
+ p2 = next_dir(p2, seg);
+ if (p2) {
+ p2 = next_dir(p2, seg);
+ } else {
+ return 1;
+ }
+
+ if (p2 == p1) {
+ return 0; // we have a loop
+ }
+ }
+ return 1;
+}
+#endif
+
+int
+Stripe::dir_freelist_length(int s)
+{
+ int free = 0;
+ CacheDirEntry *seg = this->dir_segment(s);
+ CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
+ if (this->check_loop(s)) {
+ return (DIR_DEPTH - 1) * this->_buckets;
+ }
+ while (e) {
+ free++;
+ e = next_dir(e, seg);
+ }
+ return free;
+}
+
+int
+Stripe::check_loop(int s)
+{
+ // look for loop in the segment
+ // rewrite the freelist if loop is present
+ CacheDirEntry *seg = this->dir_segment(s);
+ CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
+ std::bitset<65536> f_bitset;
+ f_bitset.reset();
+ while (e) {
+ int i = dir_next(e);
+ if (f_bitset.test(i)) {
+ // bit was set in a previous round so a loop is present
+ std::cout << "<check_loop> Loop present in Span" << this->_span->_path.path() << " Stripe: " << this->hashText
+ << "Segment: " << s << std::endl;
+ this->dir_init_segment(s);
+ return 1;
+ }
+ f_bitset[i] = 1;
+ e = dir_from_offset(i, seg);
+ }
+
+ return 0;
+}
+
+int
+compare_ushort(void const *a, void const *b)
+{
+ return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
+}
+
+void
+Stripe::dir_check()
+{
+ static int const SEGMENT_HISTOGRAM_WIDTH = 16;
+ int hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
+ unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
+ int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
+
+ this->loadMeta();
+ this->loadDir();
+ // uint64_t total_buckets = _segments * _buckets;
+ // uint64_t total_entries = total_buckets * DIR_DEPTH;
+ int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
+ int j;
+ int stale = 0, in_use = 0, empty = 0;
+ int free = 0, head = 0, buckets_in_use = 0;
+
+ int max_chain_length = 0;
+ int64_t bytes_in_use = 0;
+ std::cout << "Stripe '[" << hashText << "]'" << std::endl;
+ std::cout << " Directory Bytes: " << _segments * _buckets * SIZEOF_DIR << std::endl;
+ std::cout << " Segments: " << _segments << std::endl;
+ std::cout << " Buckets per segment: " << _buckets << std::endl;
+ std::cout << " Entries: " << _segments * _buckets * DIR_DEPTH << std::endl;
+ for (int s = 0; s < _segments; s++) {
+ CacheDirEntry *seg = this->dir_segment(s);
+ int seg_chain_max = 0;
+ int seg_empty = 0;
+ int seg_in_use = 0;
+ int seg_stale = 0;
+ int seg_bytes_in_use = 0;
+ int seg_dups = 0;
+ int seg_buckets_in_use = 0;
+
+ ink_zero(chain_tag);
+ memset(chain_mark, -1, sizeof(chain_mark));
+ for (int b = 0; b < _buckets; b++) {
+ CacheDirEntry *root = dir_bucket(b, seg);
+ int h = 0;
+ int chain_idx = 0;
+ int mark = 0;
+ ++seg_buckets_in_use;
+ // walking through the directories
+ for (CacheDirEntry *e = root; e; e = next_dir(e, seg)) {
+ if (!dir_offset(e)) {
+ ++seg_empty;
+ --seg_buckets_in_use;
+ // this should only happen on the first dir in a bucket
+ assert(nullptr == next_dir(e, seg));
+ break;
+ } else {
+ int e_idx = e - seg;
+ ++h;
+ chain_tag[chain_idx++] = dir_tag(e);
+ if (chain_mark[e_idx] == mark) {
+ printf(" - Cycle of length %d detected for bucket %d\n", h, b);
+ } else if (chain_mark[e_idx] >= 0) {
+ printf(" - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
+ } else {
+ chain_mark[e_idx] = mark;
+ }
+
+ if (!dir_valid(e)) {
+ ++seg_stale;
+ } else {
+ uint64_t size = dir_approx_size(e);
+ if (dir_head(e)) {
+ ++head;
+ }
+ ++seg_in_use;
+ seg_bytes_in_use += size;
+ ++frag_demographics[dir_size(e)][dir_big(e)];
+ }
+ }
+ e = next_dir(e, seg);
+ if (!e) {
+ break;
+ }
+ }
+
+ // Check for duplicates (identical tags in the same bucket).
+ if (h > 1) {
+ unsigned short last;
+ qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
+ last = chain_tag[0];
+ for (int k = 1; k < h; ++k) {
+ if (last == chain_tag[k]) {
+ ++seg_dups;
+ }
+ last = chain_tag[k];
+ }
+ }
+ ++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
+ seg_chain_max = std::max(seg_chain_max, h);
+ }
+ int fl_size = dir_freelist_length(s);
+ in_use += seg_in_use;
+ empty += seg_empty;
+ stale += seg_stale;
+ free += fl_size;
+ buckets_in_use += seg_buckets_in_use;
+ max_chain_length = std::max(max_chain_length, seg_chain_max);
+ bytes_in_use += seg_bytes_in_use;
+
+ printf(" - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
+ s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
+ seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
+ }
+ //////////////////
+
+ printf(" - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
+ max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
+
+ printf(" Chain lengths: ");
+ for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
+ printf(" %d=%d ", j, hist[j]);
+ }
+ printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
+
+ char tt[256];
+ printf(" Total Size: %" PRIu64 "\n", static_cast<uint64_t>(_len.count()));
+ printf(" Bytes in Use: %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / _len.count()));
+ printf(" Objects: %d\n", head);
+ printf(" Average Size: %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
+ printf(" Average Frags: %.2f\n", head ? static_cast<float>(in_use) / head : 0);
+ printf(" Write Position: %" PRIu64 "\n", _meta[0][0].write_pos - _content.count());
+ printf(" Wrap Count: %d\n", _meta[0][0].cycle);
+ printf(" Phase: %s\n", _meta[0][0].phase ? "true" : "false");
+ ctime_r(&_meta[0][0].create_time, tt);
+ tt[strlen(tt) - 1] = 0;
+ printf(" Sync Serial: %u\n", _meta[0][0].sync_serial);
+ printf(" Write Serial: %u\n", _meta[0][0].write_serial);
+ printf(" Create Time: %s\n", tt);
+ printf("\n");
+ printf(" Fragment size demographics\n");
+ for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
+ int block_size = DIR_BLOCK_SIZE(b);
+ int s = 0;
+ while (s < 1 << DIR_SIZE_WIDTH) {
+ for (int j = 0; j < 8; ++j, ++s) {
+ // The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
+ // base block sizes should never be used. Such entries should use the next smaller base block size.
+ if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
+ assert(frag_demographics[s][b] == 0);
+ continue;
+ }
+ printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
+ }
+ printf("\n");
+ }
+ }
+ printf("\n");
+ ////////////////
+}
+
+Errata
+Stripe::loadMeta()
+{
+ // Read from disk in chunks of this size. This needs to be a multiple of both the
+ // store block size and the directory entry size so neither goes acrss read boundaries.
+ // Beyond that the value should be in the ~10MB range for what I guess is best performance
+ // vs. blocking production disk I/O on a live system.
+ constexpr static int64_t N = (1 << 8) * CacheStoreBlocks::SCALE * sizeof(CacheDirEntry);
+
+ Errata zret;
+
+ int fd = _span->_fd;
+ Bytes n;
+ bool found;
+ MemSpan data; // The current view of the read buffer.
+ Bytes delta;
+ Bytes pos = _start;
+ // Avoid searching the entire span, because some of it must be content. Assume that AOS is more than 160
+ // which means at most 10/160 (1/16) of the span can be directory/header.
+ Bytes limit = pos + _len / 16;
+ size_t io_align = _span->_geometry.blocksz;
+ StripeMeta const *meta;
+
+ std::unique_ptr<char> bulk_buff; // Buffer for bulk reads.
+ static const size_t SBSIZE = CacheStoreBlocks::SCALE; // save some typing.
+ alignas(SBSIZE) char stripe_buff[SBSIZE]; // Use when reading a single stripe block.
+ alignas(SBSIZE) char stripe_buff2[SBSIZE]; // use to save the stripe freelist
+ if (io_align > SBSIZE)
+ return Errata::Message(0, 1, "Cannot load stripe ", _idx, " on span ", _span->_path, " because the I/O block alignment ",
+ io_align, " is larger than the buffer alignment ", SBSIZE);
+
+ _directory._start = pos;
+ // Header A must be at the start of the stripe block.
+ // Todo: really need to check pread() for failure.
+ ssize_t headerbyteCount = pread(fd, stripe_buff2, SBSIZE, pos);
+ n.assign(headerbyteCount);
+ data.assign(stripe_buff2, n);
+ meta = data.ptr<StripeMeta>(0);
+ // TODO:: We need to read more data at this point to populate dir
+ if (this->validateMeta(meta)) {
+ delta = Bytes(data.ptr<char>(0) - stripe_buff2);
+ _meta[A][HEAD] = *meta;
+ _meta_pos[A][HEAD] = round_down(pos + Bytes(delta));
+ pos += round_up(SBSIZE);
+ _directory._skip = Bytes(SBSIZE); // first guess, updated in @c updateLiveData when the header length is computed.
+ // Search for Footer A. Nothing for it except to grub through the disk.
+ // The searched data is cached so it's available for directory parsing later if needed.
+ while (pos < limit) {
+ char *buff = static_cast<char *>(ats_memalign(io_align, N));
+ bulk_buff.reset(buff);
+ n.assign(pread(fd, buff, N, pos));
+ data.assign(buff, n);
+ found = this->probeMeta(data, &_meta[A][HEAD]);
+ if (found) {
+ ptrdiff_t diff = data.ptr<char>(0) - buff;
+ _meta[A][FOOT] = data.template at<StripeMeta>(0);
+ _meta_pos[A][FOOT] = round_down(pos + Bytes(diff));
+ // don't bother attaching block if the footer is at the start
+ if (diff > 0) {
+ _directory._clip = Bytes(N - diff);
+ _directory.append({bulk_buff.release(), N});
+ }
+ data += SBSIZE; // skip footer for checking on B copy.
+ break;
+ } else {
+ _directory.append({bulk_buff.release(), N});
+ pos += round_up(N);
+ }
+ }
+ } else {
+ zret.push(0, 1, "Header A not found");
+ }
+ pos = _meta_pos[A][FOOT];
+ // Technically if Copy A is valid, Copy B is not needed. But at this point it's cheap to retrieve
+ // (as the exact offset is computable).
+ if (_meta_pos[A][FOOT] > 0) {
+ delta = _meta_pos[A][FOOT] - _meta_pos[A][HEAD];
+ // Header B should be immediately after Footer A. If at the end of the last read,
+ // do another read.
+ // if (data.size() < CacheStoreBlocks::SCALE) {
+ // pos += round_up(N);
+ // n = Bytes(pread(fd, stripe_buff, CacheStoreBlocks::SCALE, pos));
+ // data.assign(stripe_buff, n);
+ // }
+ pos = this->_start + Bytes(vol_dirlen());
+ meta = data.ptr<StripeMeta>(0);
+ if (this->validateMeta(meta)) {
+ _meta[B][HEAD] = *meta;
+ _meta_pos[B][HEAD] = round_down(pos);
+
+ // Footer B must be at the same relative offset to Header B as Footer A -> Header A.
+ pos += delta;
+ n = Bytes(pread(fd, stripe_buff, ts::CacheStoreBlocks::SCALE, pos));
+ data.assign(stripe_buff, n);
+ meta = data.ptr<StripeMeta>(0);
+ if (this->validateMeta(meta)) {
+ _meta[B][FOOT] = *meta;
+ _meta_pos[B][FOOT] = round_down(pos);
+ }
+ }
+ }
+
+ if (_meta_pos[A][FOOT] > 0) {
+ if (_meta[A][HEAD].sync_serial == _meta[A][FOOT].sync_serial &&
+ (0 == _meta_pos[B][FOOT] || _meta[B][HEAD].sync_serial != _meta[B][FOOT].sync_serial ||
+ _meta[A][HEAD].sync_serial >= _meta[B][HEAD].sync_serial)) {
+ this->updateLiveData(A);
+ } else if (_meta_pos[B][FOOT] > 0 && _meta[B][HEAD].sync_serial == _meta[B][FOOT].sync_serial) {
+ this->updateLiveData(B);
+ } else {
+ zret.push(0, 1, "Invalid stripe data - candidates found but sync serial data not valid. ", _meta[A][HEAD].sync_serial, ":",
+ _meta[A][FOOT].sync_serial, ":", _meta[B][HEAD].sync_serial, ":", _meta[B][FOOT].sync_serial);
+ }
+ }
+
+ n.assign(headerbyteCount);
+ data.assign(stripe_buff2, n);
+ meta = data.ptr<StripeMeta>(0);
+ // copy freelist
+ freelist = (uint16_t *)malloc(_segments * sizeof(uint16_t));
+ for (int i = 0; i < _segments; i++)
+ freelist[i] = meta->freelist[i];
+
+ if (!zret)
+ _directory.clear();
+ return zret;
+}
+
+} // end ct
diff --git a/cmd/traffic_cache_tool/CacheDefs.h b/cmd/traffic_cache_tool/CacheDefs.h
index 6db0e2e..65b6305 100644
--- a/cmd/traffic_cache_tool/CacheDefs.h
+++ b/cmd/traffic_cache_tool/CacheDefs.h
@@ -30,6 +30,11 @@
#include <ts/Regex.h>
#include <tsconfig/Errata.h>
#include <ts/TextView.h>
+#include <ts/ink_file.h>
+#include <list>
+
+#include "Command.h"
+#include "File.h"
#if defined(MAGIC)
#undef MAGIC
@@ -454,3 +459,162 @@ private:
DFA port;
DFA regex;
};
+
+using ts::Bytes;
+using ts::Megabytes;
+using ts::CacheStoreBlocks;
+using ts::CacheStripeBlocks;
+using ts::StripeMeta;
+using ts::CacheStripeDescriptor;
+using ts::Errata;
+using ts::FilePath;
+using ts::CacheDirEntry;
+using ts::MemSpan;
+using ts::Doc;
+
+constexpr int ESTIMATED_OBJECT_SIZE = 8000;
+constexpr int DEFAULT_HW_SECTOR_SIZE = 512;
+constexpr int VOL_HASH_TABLE_SIZE = 32707;
+constexpr unsigned short VOL_HASH_EMPTY = 65535;
+constexpr int DIR_TAG_WIDTH = 12;
+constexpr int DIR_DEPTH = 4;
+constexpr int SIZEOF_DIR = 10;
+constexpr int MAX_ENTRIES_PER_SEGMENT = (1 << 16);
+constexpr int DIR_SIZE_WIDTH = 6;
+constexpr int DIR_BLOCK_SIZES = 4;
+constexpr int CACHE_BLOCK_SHIFT = 9;
+constexpr int CACHE_BLOCK_SIZE = (1 << CACHE_BLOCK_SHIFT); // 512, smallest sector size
+
+namespace ct
+{
+struct Stripe;
+struct Span {
+ Span(FilePath const &path) : _path(path) {}
+ Errata load();
+ Errata loadDevice();
+ bool isEmpty() const;
+ int header_len = 0;
+
+ /// Replace all existing stripes with a single unallocated stripe covering the span.
+ Errata clear();
+
+ /// This is broken and needs to be cleaned up.
+ void clearPermanently();
+
+ ts::Rv<Stripe *> allocStripe(int vol_idx, CacheStripeBlocks len);
+ Errata updateHeader(); ///< Update serialized header and write to disk.
+
+ FilePath _path; ///< File system location of span.
+ ats_scoped_fd _fd; ///< Open file descriptor for span.
+ int _vol_idx = 0; ///< Forced volume.
+ CacheStoreBlocks _base; ///< Offset to first usable byte.
+ CacheStoreBlocks _offset; ///< Offset to first content byte.
+ // The space between _base and _offset is where the span information is stored.
+ CacheStoreBlocks _len; ///< Total length of span.
+ CacheStoreBlocks _free_space; ///< Total size of free stripes.
+ ink_device_geometry _geometry; ///< Geometry of span.
+ uint64_t num_usable_blocks; // number of usable blocks for stripes i.e., after subtracting the skip and the disk header.
+ /// Local copy of serialized header data stored on in the span.
+ std::unique_ptr<ts::SpanHeader> _header;
+ /// Live information about stripes.
+ /// Seeded from @a _header and potentially agumented with direct probing.
+ std::list<Stripe *> _stripes;
+};
+/* --------------------------------------------------------------------------------------- */
+struct Stripe {
+ /// Meta data is stored in 4 copies A/B and Header/Footer.
+ enum Copy { A = 0, B = 1 };
+ enum { HEAD = 0, FOOT = 1 };
+
+ /// Piece wise memory storage for the directory.
+ struct Chunk {
+ Bytes _start; ///< Starting offset relative to physical device of span.
+ Bytes _skip; ///< # of bytes not valid at the start of the first block.
+ Bytes _clip; ///< # of bytes not valid at the end of the last block.
+
+ typedef std::vector<MemSpan> Chain;
+ Chain _chain; ///< Chain of blocks.
+
+ ~Chunk();
+
+ void append(MemSpan m);
+ void clear();
+ };
+
+ /// Construct from span header data.
+ Stripe(Span *span, Bytes start, CacheStoreBlocks len);
+
+ /// Is stripe unallocated?
+ bool isFree() const;
+
+ /** Probe a chunk of memory @a mem for stripe metadata.
+
+ @a mem is updated to remove memory that has been probed. If @a
+ meta is not @c nullptr then it is used for additional cross
+ checking.
+
+ @return @c true if @a mem has valid data, @c false otherwise.
+ */
+ bool probeMeta(MemSpan &mem, StripeMeta const *meta = nullptr);
+
+ /// Check a buffer for being valid stripe metadata.
+ /// @return @c true if valid, @c false otherwise.
+ static bool validateMeta(StripeMeta const *meta);
+
+ /// Load metadata for this stripe.
+ Errata loadMeta();
+ Errata loadDir();
+ int check_loop(int s);
+ void dir_check();
+ bool walk_bucket_chain(int s); // returns true if there is a loop
+ void walk_all_buckets();
+
+ /// Initialize the live data from the loaded serialized data.
+ void updateLiveData(enum Copy c);
+
+ Span *_span; ///< Hosting span.
+ INK_MD5 hash_id; /// hash_id
+ Bytes _start; ///< Offset of first byte of stripe metadata.
+ Bytes _content; ///< Start of content.
+ CacheStoreBlocks _len; ///< Length of stripe.
+ uint8_t _vol_idx = 0; ///< Volume index.
+ uint8_t _type = 0; ///< Stripe type.
+ int8_t _idx = -1; ///< Stripe index in span.
+ int agg_buf_pos = 0;
+
+ int64_t _buckets; ///< Number of buckets per segment.
+ int64_t _segments; ///< Number of segments.
+
+ std::string hashText;
+
+ /// Meta copies, indexed by A/B then HEAD/FOOT.
+ StripeMeta _meta[2][2];
+ /// Locations for the meta data.
+ CacheStoreBlocks _meta_pos[2][2];
+ /// Directory.
+ Chunk _directory;
+ CacheDirEntry const *dir = nullptr; // the big buffer that will hold the whole directory of stripe header.
+ uint16_t *freelist = nullptr; // using this freelist instead of the one in StripeMeta.
+ // This is because the freelist is not being copied to _metap[2][2] correctly.
+ // need to do something about it .. hmmm :-?
+ int dir_freelist_length(int s);
+ TS_INLINE CacheDirEntry *dir_segment(int s);
+ TS_INLINE CacheDirEntry *vol_dir_segment(int s);
+ int64_t stripe_offset(CacheDirEntry *e); // offset of e w.r.t the stripe
+ size_t vol_dirlen();
+ TS_INLINE int vol_headerlen();
+ void vol_init_data_internal();
+ void vol_init_data();
+ void dir_init_segment(int s);
+ void dir_free_entry(CacheDirEntry *e, int s);
+ CacheDirEntry *dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s);
+ // int dir_bucket_length(CacheDirEntry *b, int s);
+ int dir_probe(INK_MD5 *key, CacheDirEntry *result, CacheDirEntry **last_collision);
+ bool dir_valid(CacheDirEntry *e);
+ bool validate_sync_serial();
+ Errata updateHeaderFooter();
+ Errata InitializeMeta();
+ void init_dir();
+ Errata clear(); // clears striped headers and footers
+};
+} // end ct
diff --git a/cmd/traffic_cache_tool/CacheTool.cc b/cmd/traffic_cache_tool/CacheTool.cc
index c620b64..fff2232 100644
--- a/cmd/traffic_cache_tool/CacheTool.cc
+++ b/cmd/traffic_cache_tool/CacheTool.cc
@@ -59,1073 +59,17 @@ using ts::CacheDirEntry;
using ts::MemSpan;
using ts::Doc;
-constexpr int ESTIMATED_OBJECT_SIZE = 8000;
-constexpr int DEFAULT_HW_SECTOR_SIZE = 512;
-constexpr int VOL_HASH_TABLE_SIZE = 32707;
-int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
-CacheStoreBlocks Vol_hash_alloc_size(1024);
-constexpr unsigned short VOL_HASH_EMPTY = 65535;
-constexpr int DIR_TAG_WIDTH = 12;
-constexpr int DIR_DEPTH = 4;
-constexpr int SIZEOF_DIR = 10;
-constexpr int MAX_ENTRIES_PER_SEGMENT = (1 << 16);
-constexpr int DIR_SIZE_WIDTH = 6;
-constexpr int DIR_BLOCK_SIZES = 4;
-constexpr int CACHE_BLOCK_SHIFT = 9;
-constexpr int CACHE_BLOCK_SIZE = (1 << CACHE_BLOCK_SHIFT); // 512, smallest sector size
-const Bytes ts::CacheSpan::OFFSET{CacheStoreBlocks{1}};
-
enum { SILENT = 0, NORMAL, VERBOSE } Verbosity = NORMAL;
-
-namespace
-{
+extern int cache_config_min_average_object_size;
+extern CacheStoreBlocks Vol_hash_alloc_size;
+extern int OPEN_RW_FLAG;
+const Bytes ts::CacheSpan::OFFSET{CacheStoreBlocks{1}};
FilePath SpanFile;
FilePath VolumeFile;
-
ts::CommandTable Commands;
-// Default this to read only, only enable write if specifically required.
-int OPEN_RW_FLAG = O_RDONLY;
-
-struct Stripe;
-
-struct Span {
- Span(FilePath const &path) : _path(path) {}
- Errata load();
- Errata loadDevice();
- bool isEmpty() const;
- int header_len = 0;
-
- /// Replace all existing stripes with a single unallocated stripe covering the span.
- Errata clear();
-
- /// This is broken and needs to be cleaned up.
- void clearPermanently();
-
- ts::Rv<Stripe *> allocStripe(int vol_idx, CacheStripeBlocks len);
- Errata updateHeader(); ///< Update serialized header and write to disk.
-
- FilePath _path; ///< File system location of span.
- ats_scoped_fd _fd; ///< Open file descriptor for span.
- int _vol_idx = 0; ///< Forced volume.
- CacheStoreBlocks _base; ///< Offset to first usable byte.
- CacheStoreBlocks _offset; ///< Offset to first content byte.
- // The space between _base and _offset is where the span information is stored.
- CacheStoreBlocks _len; ///< Total length of span.
- CacheStoreBlocks _free_space; ///< Total size of free stripes.
- ink_device_geometry _geometry; ///< Geometry of span.
- uint64_t num_usable_blocks; // number of usable blocks for stripes i.e., after subtracting the skip and the disk header.
- /// Local copy of serialized header data stored on in the span.
- std::unique_ptr<ts::SpanHeader> _header;
- /// Live information about stripes.
- /// Seeded from @a _header and potentially agumented with direct probing.
- std::list<Stripe *> _stripes;
-};
-/* --------------------------------------------------------------------------------------- */
-struct Stripe {
- /// Meta data is stored in 4 copies A/B and Header/Footer.
- enum Copy { A = 0, B = 1 };
- enum { HEAD = 0, FOOT = 1 };
-
- /// Piece wise memory storage for the directory.
- struct Chunk {
- Bytes _start; ///< Starting offset relative to physical device of span.
- Bytes _skip; ///< # of bytes not valid at the start of the first block.
- Bytes _clip; ///< # of bytes not valid at the end of the last block.
-
- typedef std::vector<MemSpan> Chain;
- Chain _chain; ///< Chain of blocks.
-
- ~Chunk();
-
- void append(MemSpan m);
- void clear();
- };
-
- /// Construct from span header data.
- Stripe(Span *span, Bytes start, CacheStoreBlocks len);
-
- /// Is stripe unallocated?
- bool isFree() const;
-
- /** Probe a chunk of memory @a mem for stripe metadata.
-
- @a mem is updated to remove memory that has been probed. If @a
- meta is not @c nullptr then it is used for additional cross
- checking.
-
- @return @c true if @a mem has valid data, @c false otherwise.
- */
- bool probeMeta(MemSpan &mem, StripeMeta const *meta = nullptr);
-
- /// Check a buffer for being valid stripe metadata.
- /// @return @c true if valid, @c false otherwise.
- static bool validateMeta(StripeMeta const *meta);
-
- /// Load metadata for this stripe.
- Errata loadMeta();
- Errata loadDir();
- int check_loop(int s);
- void dir_check();
- bool walk_bucket_chain(int s); // returns true if there is a loop
- void walk_all_buckets();
-
- /// Initialize the live data from the loaded serialized data.
- void updateLiveData(enum Copy c);
-
- Span *_span; ///< Hosting span.
- INK_MD5 hash_id; /// hash_id
- Bytes _start; ///< Offset of first byte of stripe metadata.
- Bytes _content; ///< Start of content.
- CacheStoreBlocks _len; ///< Length of stripe.
- uint8_t _vol_idx = 0; ///< Volume index.
- uint8_t _type = 0; ///< Stripe type.
- int8_t _idx = -1; ///< Stripe index in span.
- int agg_buf_pos = 0;
-
- int64_t _buckets; ///< Number of buckets per segment.
- int64_t _segments; ///< Number of segments.
-
- std::string hashText;
-
- /// Meta copies, indexed by A/B then HEAD/FOOT.
- StripeMeta _meta[2][2];
- /// Locations for the meta data.
- CacheStoreBlocks _meta_pos[2][2];
- /// Directory.
- Chunk _directory;
- CacheDirEntry const *dir = nullptr; // the big buffer that will hold the whole directory of stripe header.
- uint16_t *freelist = nullptr; // using this freelist instead of the one in StripeMeta.
- // This is because the freelist is not being copied to _metap[2][2] correctly.
- // need to do something about it .. hmmm :-?
- int dir_freelist_length(int s);
- TS_INLINE CacheDirEntry *dir_segment(int s);
- TS_INLINE CacheDirEntry *vol_dir_segment(int s);
- int64_t stripe_offset(CacheDirEntry *e); // offset of e w.r.t the stripe
- size_t vol_dirlen();
- TS_INLINE int vol_headerlen();
- void vol_init_data_internal();
- void vol_init_data();
- void dir_init_segment(int s);
- void dir_free_entry(CacheDirEntry *e, int s);
- CacheDirEntry *dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s);
- // int dir_bucket_length(CacheDirEntry *b, int s);
- int dir_probe(INK_MD5 *key, CacheDirEntry *result, CacheDirEntry **last_collision);
- bool dir_valid(CacheDirEntry *e);
- bool validate_sync_serial();
- Errata updateHeaderFooter();
- Errata InitializeMeta();
- void init_dir();
- Errata clear(); // clears striped headers and footers
-};
-
-bool
-Stripe::validate_sync_serial()
-{
- // check if A sync_serials match and A is at least as updated as B
- return (_meta[0][0].sync_serial == _meta[0][1].sync_serial &&
- (_meta[0][0].sync_serial >= _meta[1][0].sync_serial ||
- _meta[1][0].sync_serial != _meta[1][1].sync_serial)) || // OR check if B's sync_serials match
- (_meta[1][0].sync_serial == _meta[1][1].sync_serial);
-}
-
-Errata
-Stripe::clear()
-{
- Errata zret;
- alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static.
- for (auto i : {A, B}) {
- for (auto j : {HEAD, FOOT}) {
- ssize_t n = pwrite(_span->_fd, zero, CacheStoreBlocks::SCALE, this->_meta_pos[i][j]);
- if (n < CacheStoreBlocks::SCALE)
- std::cout << "Failed to clear stripe header" << std::endl;
- }
- }
-
- return zret;
-}
-Stripe::Chunk::~Chunk()
-{
- this->clear();
-}
-void
-Stripe::Chunk::append(MemSpan m)
-{
- _chain.push_back(m);
-}
-void
-Stripe::Chunk::clear()
-{
- for (auto &m : _chain)
- free(const_cast<void *>(m.data()));
- _chain.clear();
-}
-
-Stripe::Stripe(Span *span, Bytes start, CacheStoreBlocks len) : _span(span), _start(start), _len(len)
-{
- ts::bwprint(hashText, "{} {}:{}", span->_path.path(), _start.count(), _len.count());
- printf("hash id of stripe is hash of %.*s\n", static_cast<int>(hashText.size()), hashText.data());
-}
-
-bool
-Stripe::isFree() const
-{
- return 0 == _vol_idx;
-}
-
-// TODO: Implement the whole logic
-Errata
-Stripe::InitializeMeta()
-{
- Errata zret;
- // memset(this->raw_dir, 0, dir_len);
- for (int i = 0; i < 2; i++) {
- for (int j = 0; j < 2; j++) {
- _meta[i][j].magic = StripeMeta::MAGIC;
- _meta[i][j].version.ink_major = ts::CACHE_DB_MAJOR_VERSION;
- _meta[i][j].version.ink_minor = ts::CACHE_DB_MINOR_VERSION;
- _meta[i][j].agg_pos = _meta[i][j].last_write_pos = _meta[i][j].write_pos = this->_content;
- _meta[i][j].phase = _meta[i][j].cycle = _meta[i][j].sync_serial = _meta[i][j].write_serial = _meta[i][j].dirty = 0;
- _meta[i][j].create_time = time(nullptr);
- _meta[i][j].sector_size = DEFAULT_HW_SECTOR_SIZE;
- }
- }
- if (!freelist) // freelist is not allocated yet
- {
- freelist = (uint16_t *)malloc(_segments * sizeof(uint16_t)); // segments has already been calculated
- }
- if (!dir) // for new spans, this will likely be nullptr as we don't need to read the stripe meta from disk
- {
- char *raw_dir = (char *)ats_memalign(ats_pagesize(), this->vol_dirlen());
- dir = (CacheDirEntry *)(raw_dir + this->vol_headerlen());
- }
- init_dir();
- return zret;
-}
-
-// Need to be bit more robust at some point.
-bool
-Stripe::validateMeta(StripeMeta const *meta)
-{
- // Need to be bit more robust at some point.
- return StripeMeta::MAGIC == meta->magic && meta->version.ink_major <= ts::CACHE_DB_MAJOR_VERSION &&
- meta->version.ink_minor <= 2 // This may have always been zero, actually.
- ;
-}
-
-bool
-Stripe::probeMeta(MemSpan &mem, StripeMeta const *base_meta)
-{
- while (mem.usize() >= sizeof(StripeMeta)) {
- StripeMeta const *meta = mem.ptr<StripeMeta>(0);
- if (this->validateMeta(meta) && (base_meta == nullptr || // no base version to check against.
- (meta->version == base_meta->version) // need more checks here I think.
- )) {
- return true;
- }
- // The meta data is stored aligned on a stripe block boundary, so only need to check there.
- mem += CacheStoreBlocks::SCALE;
- }
- return false;
-}
-
-/* INK_ALIGN() is only to be used to align on a power of 2 boundary */
-#define INK_ALIGN(size, boundary) (((size) + ((boundary)-1)) & ~((boundary)-1))
-
-#define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), 8192)
-
-Errata
-Stripe::updateHeaderFooter()
-{
- Errata zret;
- this->vol_init_data();
-
- int64_t hdr_size = this->vol_headerlen();
- int64_t dir_size = this->vol_dirlen();
- Bytes footer_offset = Bytes(dir_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta)));
- _meta_pos[A][HEAD] = round_down(_start);
- _meta_pos[A][FOOT] = round_down(_start + footer_offset);
- _meta_pos[B][HEAD] = round_down(this->_start + Bytes(dir_size));
- _meta_pos[B][FOOT] = round_down(this->_start + Bytes(dir_size) + footer_offset);
- std::cout << "updating header " << _meta_pos[0][0] << std::endl;
- std::cout << "updating header " << _meta_pos[0][1] << std::endl;
- std::cout << "updating header " << _meta_pos[1][0] << std::endl;
- std::cout << "updating header " << _meta_pos[1][1] << std::endl;
- InitializeMeta();
-
- if (!OPEN_RW_FLAG) {
- zret.push(0, 1, "Writing Not Enabled.. Please use --write to enable writing to disk");
- return zret;
- }
-
- char *meta_t = (char *)ats_memalign(ats_pagesize(), dir_size);
- // copy headers
- for (auto i : {A, B}) {
- // copy header
- memcpy(meta_t, &_meta[i][HEAD], sizeof(StripeMeta));
- // copy freelist
- memcpy(meta_t + sizeof(StripeMeta) - sizeof(uint16_t), this->freelist, this->_segments * sizeof(uint16_t));
-
- ssize_t n = pwrite(_span->_fd, meta_t, hdr_size, _meta_pos[i][HEAD]);
- if (n < hdr_size) {
- std::cout << "problem writing header to disk: " << strerror(errno) << ":"
- << " " << n << "<" << hdr_size << std::endl;
- zret = Errata::Message(0, errno, "Failed to write stripe header ");
- return zret;
- }
- // copy dir entries
- dir_size = dir_size - hdr_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
- memcpy(meta_t, (char *)dir, dir_size);
- n = pwrite(_span->_fd, meta_t, dir_size, _meta_pos[i][HEAD] + hdr_size); //
- if (n < dir_size) {
- std::cout << "problem writing dir to disk: " << strerror(errno) << ":"
- << " " << n << "<" << dir_size << std::endl;
- zret = Errata::Message(0, errno, "Failed to write stripe header ");
- return zret;
- }
-
- // copy footer,
- memcpy(meta_t, &_meta[i][FOOT], sizeof(StripeMeta));
-
- int64_t footer_size = ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
- n = pwrite(_span->_fd, meta_t, footer_size, _meta_pos[i][FOOT]);
- if (n < footer_size) {
- std::cout << "problem writing footer to disk: " << strerror(errno) << ":"
- << " " << n << "<" << footer_size << std::endl;
- zret = Errata::Message(0, errno, "Failed to write stripe header ");
- return zret;
- }
- }
- ats_memalign_free(meta_t);
- return zret;
-}
-
-TS_INLINE int
-Stripe::vol_headerlen()
-{
- return ROUND_TO_STORE_BLOCK(sizeof(StripeMeta) + sizeof(uint16_t) * (this->_segments - 1));
-}
-
-size_t
-Stripe::vol_dirlen()
-{
- return vol_headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->_buckets) * DIR_DEPTH * this->_segments * SIZEOF_DIR) +
- ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
-}
-
-void
-Stripe::vol_init_data_internal()
-{
- this->_buckets =
- ((this->_len.count() * 8192 - (this->_content - this->_start)) / cache_config_min_average_object_size) / DIR_DEPTH;
- this->_segments = (this->_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
- this->_buckets = (this->_buckets + this->_segments - 1) / this->_segments;
- this->_content = this->_start + Bytes(2 * vol_dirlen());
-}
-
-void
-Stripe::vol_init_data()
-{
- // iteratively calculate start + buckets
- this->vol_init_data_internal();
- this->vol_init_data_internal();
- this->vol_init_data_internal();
-}
-
-void
-Stripe::updateLiveData(enum Copy c)
-{
- // CacheStoreBlocks delta{_meta_pos[c][FOOT] - _meta_pos[c][HEAD]};
- CacheStoreBlocks header_len(0);
- // int64_t n_buckets;
- // int64_t n_segments;
-
- _content = _start;
- /*
- * COMMENTING THIS SECTION FOR NOW TO USE THE EXACT LOGIN USED IN ATS TO CALCULATE THE NUMBER OF SEGMENTS AND BUCKETS
- // Past the header is the segment free list heads which if sufficiently long (> ~4K) can take
- // more than 1 store block. Start with a guess of 1 and adjust upwards as needed. A 2TB stripe
- // with an AOS of 8000 has roughly 3700 segments meaning that for even 10TB drives this loop
- // should only be a few iterations.
- do {
- ++header_len;
- n_buckets = Bytes(delta - header_len) / (sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET);
- n_segments = n_buckets / ts::MAX_BUCKETS_PER_SEGMENT;
- // This should never be more than one loop, usually none.
- while ((n_buckets / n_segments) > ts::MAX_BUCKETS_PER_SEGMENT)
- ++n_segments;
- } while ((sizeof(StripeMeta) + sizeof(uint16_t) * n_segments) > static_cast<size_t>(header_len));
-
- _buckets = n_buckets / n_segments;
- _segments = n_segments;
- */
- _directory._skip = header_len;
-}
-
-#define dir_big(_e) ((uint32_t)((((_e)->w[1]) >> 8) & 0x3))
-#define dir_bit(_e, _w, _b) ((uint32_t)(((_e)->w[_w] >> (_b)) & 1))
-#define dir_size(_e) ((uint32_t)(((_e)->w[1]) >> 10))
-#define dir_approx_size(_e) ((dir_size(_e) + 1) * DIR_BLOCK_SIZE(dir_big(_e)))
-#define dir_head(_e) dir_bit(_e, 2, 13)
-#define DIR_MASK_TAG(_t) ((_t) & ((1 << DIR_TAG_WIDTH) - 1))
-#define dir_tag(_e) ((uint32_t)((_e)->w[2] & ((1 << DIR_TAG_WIDTH) - 1)))
-#define dir_offset(_e) \
- ((int64_t)(((uint64_t)(_e)->w[0]) | (((uint64_t)((_e)->w[1] & 0xFF)) << 16) | (((uint64_t)(_e)->w[4]) << 24)))
-#define dir_set_offset(_e, _o) \
- do { \
- (_e)->w[0] = (uint16_t)_o; \
- (_e)->w[1] = (uint16_t)((((_o) >> 16) & 0xFF) | ((_e)->w[1] & 0xFF00)); \
- (_e)->w[4] = (uint16_t)((_o) >> 24); \
- } while (0)
-//#define dir_segment(_s, _d) vol_dir_segment(_d, _s)
-#define dir_in_seg(_s, _i) ((CacheDirEntry *)(((char *)(_s)) + (SIZEOF_DIR * (_i))))
-#define dir_next(_e) (_e)->w[3]
-#define dir_phase(_e) dir_bit(_e, 2, 12)
-#define DIR_BLOCK_SHIFT(_i) (3 * (_i))
-#define DIR_BLOCK_SIZE(_i) (CACHE_BLOCK_SIZE << DIR_BLOCK_SHIFT(_i))
-#define dir_set_prev(_e, _o) (_e)->w[2] = (uint16_t)(_o)
-#define dir_set_next(_e, _o) (_e)->w[3] = (uint16_t)(_o)
-
-bool
-dir_compare_tag(const CacheDirEntry *e, const INK_MD5 *key)
-{
- return (dir_tag(e) == DIR_MASK_TAG(key->slice32(2)));
-}
-
-TS_INLINE CacheDirEntry *
-Stripe::dir_segment(int s)
-{
- return vol_dir_segment(s);
-}
-
-TS_INLINE CacheDirEntry *
-Stripe::vol_dir_segment(int s)
-{
- return (CacheDirEntry *)(((char *)this->dir) + (s * this->_buckets) * DIR_DEPTH * SIZEOF_DIR);
-}
-
-TS_INLINE CacheDirEntry *
-dir_bucket(int64_t b, CacheDirEntry *seg)
-{
- return dir_in_seg(seg, b * DIR_DEPTH);
-}
-
-TS_INLINE CacheDirEntry *
-dir_from_offset(int64_t i, CacheDirEntry *seg)
-{
-#if DIR_DEPTH < 5
- if (!i)
- return 0;
- return dir_in_seg(seg, i);
-#else
- i = i + ((i - 1) / (DIR_DEPTH - 1));
- return dir_in_seg(seg, i);
-#endif
-}
-
-TS_INLINE int
-vol_in_phase_valid(Stripe *d, CacheDirEntry *e)
-{
- return (dir_offset(e) - 1 < ((d->_meta[0][0].write_pos + d->agg_buf_pos - d->_start) / CACHE_BLOCK_SIZE));
-}
-
-TS_INLINE int
-vol_out_of_phase_valid(Stripe *d, CacheDirEntry *e)
-{
- return (dir_offset(e) - 1 >= ((d->_meta[0][0].agg_pos - d->_start) / CACHE_BLOCK_SIZE));
-}
-
-TS_INLINE CacheDirEntry *
-next_dir(CacheDirEntry *d, CacheDirEntry *seg)
-{
- int i = dir_next(d);
- return dir_from_offset(i, seg);
-}
-#define dir_offset(_e) \
- ((int64_t)(((uint64_t)(_e)->w[0]) | (((uint64_t)((_e)->w[1] & 0xFF)) << 16) | (((uint64_t)(_e)->w[4]) << 24)))
-
-TS_INLINE CacheDirEntry *
-dir_bucket_row(CacheDirEntry *b, int64_t i)
-{
- return dir_in_seg(b, i);
-}
-
-TS_INLINE int64_t
-dir_to_offset(const CacheDirEntry *d, const CacheDirEntry *seg)
-{
-#if DIR_DEPTH < 5
- return (((char *)d) - ((char *)seg)) / SIZEOF_DIR;
-#else
- int64_t i = (int64_t)((((char *)d) - ((char *)seg)) / SIZEOF_DIR);
- i = i - (i / DIR_DEPTH);
- return i;
-#endif
-}
-
-bool
-Stripe::dir_valid(CacheDirEntry *_e)
-{
- return (this->_meta[0][0].phase == dir_phase(_e) ? vol_in_phase_valid(this, _e) : vol_out_of_phase_valid(this, _e));
-}
-
-int64_t
-Stripe::stripe_offset(CacheDirEntry *e)
+namespace ct
{
- return this->_content + (int64_t)(dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE;
-}
-
-int
-Stripe::dir_probe(INK_MD5 *key, CacheDirEntry *result, CacheDirEntry **last_collision)
-{
- int segment = key->slice32(0) % this->_segments;
- int bucket = key->slice32(1) % this->_buckets;
-
- CacheDirEntry *seg = this->dir_segment(segment);
- CacheDirEntry *e = nullptr;
- e = dir_bucket(bucket, seg);
- char *stripe_buff2 = (char *)malloc(dir_approx_size(e));
- Doc *doc = nullptr;
- // TODO: collision craft is pending.. look at the main ATS code. Assuming no collision for now
- if (dir_offset(e)) {
- do {
- if (dir_compare_tag(e, key)) {
- if (dir_valid(e)) {
- std::cout << "dir_probe hit: found seg: " << segment << " bucket: " << bucket << " offset: " << dir_offset(e)
- << std::endl;
- break;
- } else {
- // let's skip deleting for now
- // e = dir_delete_entry(e, p ,segment);
- // continue;
- }
- }
- e = next_dir(e, seg);
-
- } while (e);
- int fd = _span->_fd;
- int64_t offset = stripe_offset(e);
- int64_t size = dir_approx_size(e);
- ssize_t n = pread(fd, stripe_buff2, size, offset);
- if (n < size)
- std::cout << "Failed to read content from the Stripe" << std::endl;
-
- doc = reinterpret_cast<Doc *>(stripe_buff2);
- std::string hdr(doc->hdr(), doc->hlen);
- std::cout << "HEADER\n" << hdr << std::endl;
-
- std::string data_(doc->data(), doc->data_len());
- std::cout << "DATA\n" << data_ << std::endl;
- } else {
- std::cout << "Not found in the Cache" << std::endl;
- }
- free(stripe_buff2);
- return 0; // Why does this have a non-void return?
-}
-
-CacheDirEntry *
-Stripe::dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s)
-{
- CacheDirEntry *seg = dir_segment(s);
- int no = dir_next(e);
- this->_meta[0][0].dirty = 1;
- if (p) {
- unsigned int fo = this->freelist[s];
- unsigned int eo = dir_to_offset(e, seg);
- dir_clear(e);
- dir_set_next(p, no);
- dir_set_next(e, fo);
- if (fo) {
- dir_set_prev(dir_from_offset(fo, seg), eo);
- }
- this->freelist[s] = eo;
- } else {
- CacheDirEntry *n = next_dir(e, seg);
- if (n) {
- dir_assign(e, n);
- dir_delete_entry(n, e, s);
- return e;
- } else {
- dir_clear(e);
- return nullptr;
- }
- }
- return dir_from_offset(no, seg);
-}
-
-void
-Stripe::walk_all_buckets()
-{
- for (int s = 0; s < this->_segments; s++) {
- if (walk_bucket_chain(s))
- std::cout << "Loop present in Segment " << s << std::endl;
- }
-}
-
-bool
-Stripe::walk_bucket_chain(int s)
-{
- CacheDirEntry *seg = this->dir_segment(s);
- std::bitset<65536> b_bitset;
- b_bitset.reset();
- for (int b = 0; b < this->_buckets; b++) {
- CacheDirEntry *p = nullptr;
- auto *dir_b = dir_bucket(b, seg);
- CacheDirEntry *e = dir_b;
- int len = 0;
-
- while (e) {
- len++;
- int i = dir_to_offset(e, seg);
- if (b_bitset.test(i)) {
- std::cout << "bit already set in "
- << "seg " << s << " bucket " << b << std::endl;
- }
- if (i > 0) // i.e., not the first dir in the segment
- b_bitset[i] = 1;
-
-#if 1
- if (!dir_valid(e) || !dir_offset(e)) {
- // std::cout<<"dir_clean in segment "<<s<<" =>cleaning "<<e<<" tag"<<dir_tag(e)<<" boffset"<< dir_offset(e)<< " bucket:
- // "<<dir_b<< " bucket len: "<<dir_bucket_length(dir_b, s)<<std::endl;
- e = dir_delete_entry(e, p, s);
- continue;
- }
-#endif
- p = e;
- e = next_dir(e, seg);
- }
- // std::cout<<"dir len in this bucket "<<len<<std::endl;
- }
- return false;
-}
-
-void
-Stripe::dir_free_entry(CacheDirEntry *e, int s)
-{
- CacheDirEntry *seg = this->dir_segment(s);
- unsigned int fo = this->freelist[s];
- unsigned int eo = dir_to_offset(e, seg);
- dir_set_next(e, fo);
- if (fo) {
- dir_set_prev(dir_from_offset(fo, seg), eo);
- }
- this->freelist[s] = eo;
-}
-
-// adds all the directory entries
-// in a segment to the segment freelist
-void
-Stripe::dir_init_segment(int s)
-{
- this->freelist[s] = 0;
- CacheDirEntry *seg = this->dir_segment(s);
- int l, b;
- memset(seg, 0, SIZEOF_DIR * DIR_DEPTH * this->_buckets);
- for (l = 1; l < DIR_DEPTH; l++) {
- for (b = 0; b < this->_buckets; b++) {
- CacheDirEntry *bucket = dir_bucket(b, seg);
- this->dir_free_entry(dir_bucket_row(bucket, l), s);
- }
- }
-}
-
-void
-Stripe::init_dir()
-{
- for (int s = 0; s < this->_segments; s++) {
- this->freelist[s] = 0;
- CacheDirEntry *seg = this->dir_segment(s);
- int l, b;
- for (l = 1; l < DIR_DEPTH; l++) {
- for (b = 0; b < this->_buckets; b++) {
- CacheDirEntry *bucket = dir_bucket(b, seg);
- this->dir_free_entry(dir_bucket_row(bucket, l), s);
- // std::cout<<"freelist"<<this->freelist[s]<<std::endl;
- }
- }
- }
-}
-
-Errata
-Stripe::loadDir()
-{
- Errata zret;
- int64_t dirlen = this->vol_dirlen();
- char *raw_dir = (char *)ats_memalign(ats_pagesize(), dirlen);
- dir = (CacheDirEntry *)(raw_dir + this->vol_headerlen());
- // read directory
- ssize_t n = pread(this->_span->_fd, raw_dir, dirlen, this->_start);
- if (n < dirlen)
- std::cout << "Failed to read Dir from stripe @" << this->hashText;
- return zret;
-}
-//
-// Cache Directory
-//
-
-#if 0
-// return value 1 means no loop
-// zero indicates loop
-int
-dir_bucket_loop_check(CacheDirEntry *start_dir, CacheDirEntry *seg)
-{
- if (start_dir == nullptr) {
- return 1;
- }
-
- CacheDirEntry *p1 = start_dir;
- CacheDirEntry *p2 = start_dir;
-
- while (p2) {
- // p1 moves by one entry per iteration
- assert(p1);
- p1 = next_dir(p1, seg);
- // p2 moves by two entries per iteration
- p2 = next_dir(p2, seg);
- if (p2) {
- p2 = next_dir(p2, seg);
- } else {
- return 1;
- }
-
- if (p2 == p1) {
- return 0; // we have a loop
- }
- }
- return 1;
-}
-#endif
-
-int
-Stripe::dir_freelist_length(int s)
-{
- int free = 0;
- CacheDirEntry *seg = this->dir_segment(s);
- CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
- if (this->check_loop(s)) {
- return (DIR_DEPTH - 1) * this->_buckets;
- }
- while (e) {
- free++;
- e = next_dir(e, seg);
- }
- return free;
-}
-
-int
-Stripe::check_loop(int s)
-{
- // look for loop in the segment
- // rewrite the freelist if loop is present
- CacheDirEntry *seg = this->dir_segment(s);
- CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
- std::bitset<65536> f_bitset;
- f_bitset.reset();
- while (e) {
- int i = dir_next(e);
- if (f_bitset.test(i)) {
- // bit was set in a previous round so a loop is present
- std::cout << "<check_loop> Loop present in Span" << this->_span->_path.path() << " Stripe: " << this->hashText
- << "Segment: " << s << std::endl;
- this->dir_init_segment(s);
- return 1;
- }
- f_bitset[i] = 1;
- e = dir_from_offset(i, seg);
- }
-
- return 0;
-}
-
-int
-compare_ushort(void const *a, void const *b)
-{
- return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
-}
-
-void
-Stripe::dir_check()
-{
- static int const SEGMENT_HISTOGRAM_WIDTH = 16;
- int hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
- unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
- int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
-
- this->loadMeta();
- this->loadDir();
- // uint64_t total_buckets = _segments * _buckets;
- // uint64_t total_entries = total_buckets * DIR_DEPTH;
- int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
- int j;
- int stale = 0, in_use = 0, empty = 0;
- int free = 0, head = 0, buckets_in_use = 0;
-
- int max_chain_length = 0;
- int64_t bytes_in_use = 0;
- std::cout << "Stripe '[" << hashText << "]'" << std::endl;
- std::cout << " Directory Bytes: " << _segments * _buckets * SIZEOF_DIR << std::endl;
- std::cout << " Segments: " << _segments << std::endl;
- std::cout << " Buckets per segment: " << _buckets << std::endl;
- std::cout << " Entries: " << _segments * _buckets * DIR_DEPTH << std::endl;
- for (int s = 0; s < _segments; s++) {
- CacheDirEntry *seg = this->dir_segment(s);
- int seg_chain_max = 0;
- int seg_empty = 0;
- int seg_in_use = 0;
- int seg_stale = 0;
- int seg_bytes_in_use = 0;
- int seg_dups = 0;
- int seg_buckets_in_use = 0;
-
- ink_zero(chain_tag);
- memset(chain_mark, -1, sizeof(chain_mark));
- for (int b = 0; b < _buckets; b++) {
- CacheDirEntry *root = dir_bucket(b, seg);
- int h = 0;
- int chain_idx = 0;
- int mark = 0;
- ++seg_buckets_in_use;
- // walking through the directories
- for (CacheDirEntry *e = root; e; e = next_dir(e, seg)) {
- if (!dir_offset(e)) {
- ++seg_empty;
- --seg_buckets_in_use;
- // this should only happen on the first dir in a bucket
- assert(nullptr == next_dir(e, seg));
- break;
- } else {
- int e_idx = e - seg;
- ++h;
- chain_tag[chain_idx++] = dir_tag(e);
- if (chain_mark[e_idx] == mark) {
- printf(" - Cycle of length %d detected for bucket %d\n", h, b);
- } else if (chain_mark[e_idx] >= 0) {
- printf(" - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
- } else {
- chain_mark[e_idx] = mark;
- }
-
- if (!dir_valid(e)) {
- ++seg_stale;
- } else {
- uint64_t size = dir_approx_size(e);
- if (dir_head(e)) {
- ++head;
- }
- ++seg_in_use;
- seg_bytes_in_use += size;
- ++frag_demographics[dir_size(e)][dir_big(e)];
- }
- }
- e = next_dir(e, seg);
- if (!e) {
- break;
- }
- }
-
- // Check for duplicates (identical tags in the same bucket).
- if (h > 1) {
- unsigned short last;
- qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
- last = chain_tag[0];
- for (int k = 1; k < h; ++k) {
- if (last == chain_tag[k]) {
- ++seg_dups;
- }
- last = chain_tag[k];
- }
- }
- ++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
- seg_chain_max = std::max(seg_chain_max, h);
- }
- int fl_size = dir_freelist_length(s);
- in_use += seg_in_use;
- empty += seg_empty;
- stale += seg_stale;
- free += fl_size;
- buckets_in_use += seg_buckets_in_use;
- max_chain_length = std::max(max_chain_length, seg_chain_max);
- bytes_in_use += seg_bytes_in_use;
-
- printf(" - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
- s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
- seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
- }
- //////////////////
-
- printf(" - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
- max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
-
- printf(" Chain lengths: ");
- for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
- printf(" %d=%d ", j, hist[j]);
- }
- printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
-
- char tt[256];
- printf(" Total Size: %" PRIu64 "\n", static_cast<uint64_t>(_len.count()));
- printf(" Bytes in Use: %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / _len.count()));
- printf(" Objects: %d\n", head);
- printf(" Average Size: %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
- printf(" Average Frags: %.2f\n", head ? static_cast<float>(in_use) / head : 0);
- printf(" Write Position: %" PRIu64 "\n", _meta[0][0].write_pos - _content.count());
- printf(" Wrap Count: %d\n", _meta[0][0].cycle);
- printf(" Phase: %s\n", _meta[0][0].phase ? "true" : "false");
- ctime_r(&_meta[0][0].create_time, tt);
- tt[strlen(tt) - 1] = 0;
- printf(" Sync Serial: %u\n", _meta[0][0].sync_serial);
- printf(" Write Serial: %u\n", _meta[0][0].write_serial);
- printf(" Create Time: %s\n", tt);
- printf("\n");
- printf(" Fragment size demographics\n");
- for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
- int block_size = DIR_BLOCK_SIZE(b);
- int s = 0;
- while (s < 1 << DIR_SIZE_WIDTH) {
- for (int j = 0; j < 8; ++j, ++s) {
- // The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
- // base block sizes should never be used. Such entries should use the next smaller base block size.
- if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
- assert(frag_demographics[s][b] == 0);
- continue;
- }
- printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
- }
- printf("\n");
- }
- }
- printf("\n");
- ////////////////
-}
-
-Errata
-Stripe::loadMeta()
-{
- // Read from disk in chunks of this size. This needs to be a multiple of both the
- // store block size and the directory entry size so neither goes acrss read boundaries.
- // Beyond that the value should be in the ~10MB range for what I guess is best performance
- // vs. blocking production disk I/O on a live system.
- constexpr static int64_t N = (1 << 8) * CacheStoreBlocks::SCALE * sizeof(CacheDirEntry);
-
- Errata zret;
-
- int fd = _span->_fd;
- Bytes n;
- bool found;
- MemSpan data; // The current view of the read buffer.
- Bytes delta;
- Bytes pos = _start;
- // Avoid searching the entire span, because some of it must be content. Assume that AOS is more than 160
- // which means at most 10/160 (1/16) of the span can be directory/header.
- Bytes limit = pos + _len / 16;
- size_t io_align = _span->_geometry.blocksz;
- StripeMeta const *meta;
-
- std::unique_ptr<char> bulk_buff; // Buffer for bulk reads.
- static const size_t SBSIZE = CacheStoreBlocks::SCALE; // save some typing.
- alignas(SBSIZE) char stripe_buff[SBSIZE]; // Use when reading a single stripe block.
- alignas(SBSIZE) char stripe_buff2[SBSIZE]; // use to save the stripe freelist
- if (io_align > SBSIZE)
- return Errata::Message(0, 1, "Cannot load stripe ", _idx, " on span ", _span->_path, " because the I/O block alignment ",
- io_align, " is larger than the buffer alignment ", SBSIZE);
-
- _directory._start = pos;
- // Header A must be at the start of the stripe block.
- // Todo: really need to check pread() for failure.
- ssize_t headerbyteCount = pread(fd, stripe_buff2, SBSIZE, pos);
- n.assign(headerbyteCount);
- data.assign(stripe_buff2, n);
- meta = data.ptr<StripeMeta>(0);
- // TODO:: We need to read more data at this point to populate dir
- if (this->validateMeta(meta)) {
- delta = Bytes(data.ptr<char>(0) - stripe_buff2);
- _meta[A][HEAD] = *meta;
- _meta_pos[A][HEAD] = round_down(pos + Bytes(delta));
- pos += round_up(SBSIZE);
- _directory._skip = Bytes(SBSIZE); // first guess, updated in @c updateLiveData when the header length is computed.
- // Search for Footer A. Nothing for it except to grub through the disk.
- // The searched data is cached so it's available for directory parsing later if needed.
- while (pos < limit) {
- char *buff = static_cast<char *>(ats_memalign(io_align, N));
- bulk_buff.reset(buff);
- n.assign(pread(fd, buff, N, pos));
- data.assign(buff, n);
- found = this->probeMeta(data, &_meta[A][HEAD]);
- if (found) {
- ptrdiff_t diff = data.ptr<char>(0) - buff;
- _meta[A][FOOT] = data.template at<StripeMeta>(0);
- _meta_pos[A][FOOT] = round_down(pos + Bytes(diff));
- // don't bother attaching block if the footer is at the start
- if (diff > 0) {
- _directory._clip = Bytes(N - diff);
- _directory.append({bulk_buff.release(), N});
- }
- data += SBSIZE; // skip footer for checking on B copy.
- break;
- } else {
- _directory.append({bulk_buff.release(), N});
- pos += round_up(N);
- }
- }
- } else {
- zret.push(0, 1, "Header A not found");
- }
- pos = _meta_pos[A][FOOT];
- // Technically if Copy A is valid, Copy B is not needed. But at this point it's cheap to retrieve
- // (as the exact offset is computable).
- if (_meta_pos[A][FOOT] > 0) {
- delta = _meta_pos[A][FOOT] - _meta_pos[A][HEAD];
- // Header B should be immediately after Footer A. If at the end of the last read,
- // do another read.
- // if (data.size() < CacheStoreBlocks::SCALE) {
- // pos += round_up(N);
- // n = Bytes(pread(fd, stripe_buff, CacheStoreBlocks::SCALE, pos));
- // data.assign(stripe_buff, n);
- // }
- pos = this->_start + Bytes(vol_dirlen());
- meta = data.ptr<StripeMeta>(0);
- if (this->validateMeta(meta)) {
- _meta[B][HEAD] = *meta;
- _meta_pos[B][HEAD] = round_down(pos);
-
- // Footer B must be at the same relative offset to Header B as Footer A -> Header A.
- pos += delta;
- n = Bytes(pread(fd, stripe_buff, ts::CacheStoreBlocks::SCALE, pos));
- data.assign(stripe_buff, n);
- meta = data.ptr<StripeMeta>(0);
- if (this->validateMeta(meta)) {
- _meta[B][FOOT] = *meta;
- _meta_pos[B][FOOT] = round_down(pos);
- }
- }
- }
-
- if (_meta_pos[A][FOOT] > 0) {
- if (_meta[A][HEAD].sync_serial == _meta[A][FOOT].sync_serial &&
- (0 == _meta_pos[B][FOOT] || _meta[B][HEAD].sync_serial != _meta[B][FOOT].sync_serial ||
- _meta[A][HEAD].sync_serial >= _meta[B][HEAD].sync_serial)) {
- this->updateLiveData(A);
- } else if (_meta_pos[B][FOOT] > 0 && _meta[B][HEAD].sync_serial == _meta[B][FOOT].sync_serial) {
- this->updateLiveData(B);
- } else {
- zret.push(0, 1, "Invalid stripe data - candidates found but sync serial data not valid. ", _meta[A][HEAD].sync_serial, ":",
- _meta[A][FOOT].sync_serial, ":", _meta[B][HEAD].sync_serial, ":", _meta[B][FOOT].sync_serial);
- }
- }
-
- n.assign(headerbyteCount);
- data.assign(stripe_buff2, n);
- meta = data.ptr<StripeMeta>(0);
- // copy freelist
- freelist = (uint16_t *)malloc(_segments * sizeof(uint16_t));
- for (int i = 0; i < _segments; i++)
- freelist[i] = meta->freelist[i];
-
- if (!zret)
- _directory.clear();
- return zret;
-}
-
/* --------------------------------------------------------------------------------------- */
/// A live volume.
/// Volume data based on data from loaded spans.
@@ -2138,6 +1082,7 @@ struct option Options[] = {{"help", 0, nullptr, 'h'}, {"spans", 1, nullptr, 's'
{"aos", 1, nullptr, 'o'}, {nullptr, 0, nullptr, 0}};
}
+using namespace ct;
Errata
List_Stripes(Cache::SpanDumpDepth depth)
{
--
To stop receiving notification emails like this one, please contact
paziz@apache.org.