You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by David Yu <da...@optimizely.com> on 2016/06/06 21:25:25 UTC

Update all values in RocksDB

We use Samza RocksDB to keep track of our user event sessions. The task
periodically calls window() to update all sessions in the store and purge
all closed sessions.

We do all of this in the same iterator loop.

Here's how we are doing it:


public void window(MessageCollector collector, TaskCoordinator coordinator)
throws Exception {

KeyValueIterator<String, Session> it = sessionStore.all();

while (it.hasNext()) {

Entry<String, Session> entry = it.next();
Session session = entry.getValue();

update(session);

if (session.getStatus() == Status.CLOSED) {
sessionStore.delete(entry.getKey());
} else {
sessionStore.put(entry.getKey(), session);
}
}
}


The question is: is this the correct/efficient way to do a read+update for
RocksDB?

Thanks,
David

Re: Update all values in RocksDB

Posted by David Yu <da...@optimizely.com>.
Hi, Pan,

I was reading the 10.0 documentation on Samza state management. One
particular section that explains counting the number of page views for each
user stands out to me, as it also uses a full table scan to output
aggregation results:

Note that this job effectively pauses at the hour mark to output its
> results. This is totally fine for Samza, as scanning over the contents of
> the key-value store is *quite fast*. The input stream is buffered while
> the job is doing this hourly work.


This doesn't quite match the particular concern you have described. Maybe
I'm missing something, or perhaps there is another way to do a full table
scan besides using iterators?

Also, after some investigation, I'm starting to believe that
deserialization is actually the bottleneck instead of the table traversal
itself.

Thanks
David

On Tue, Jun 7, 2016 at 10:15 AM, Yi Pan <ni...@gmail.com> wrote:

> Hi, David,
>
> Generally speaking, iterators will make a snapshot of key space of RocksDB.
> Hence, it associates with some memory overhead. More severe performance
> issue we saw before is that if you insert and delete tons of sessions in a
> short time period, the iterator seek function can be extremely slow, due to
> the need to traverse through tons of tombstone (i.e. deleted records)
> before hitting the next live record. The suggested "queue" mechanism helps
> to avoid the issue, since all deletions happen sequentially in a continuous
> block and all insertions also happen sequentially in a continuous block.
> This gives greater opportunity for the compaction thread to come in and
> cleanup all the tombstone records and make iterator faster, again.
>
> In your use case, if you can make sure that the newly inserted session's
> sessionId is *alway* at the tail of your session table, and your session
> expiration order is the same as the order determined by the sessionId, that
> should work as well.
>
> -Yi
>
> On Mon, Jun 6, 2016 at 3:17 PM, David Yu <da...@optimizely.com> wrote:
>
> > Hi, Yi,
> >
> > Yes, the sessions are keyed by the sessionId.
> >
> > In our case, iterating through all OPEN sessions is inevitable, since
> that
> > is precisely where we evaluate (base on timestamp) and close sessions. In
> > other words, the closed session queue you suggested cannot be constructed
> > without going through all the sessions periodically.
> >
> > Can you explain (on a higher level) why iteration through the entries can
> > be a slow process?
> >
> > Thanks,
> > David
> >
> > On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> > > Hi, David,
> > >
> > > I would recommend to keep a separate table of closed sessions as a
> > "queue",
> > > ordered by the time the session is closed. And in your window method,
> > just
> > > create an iterator in the "queue" and only make progress toward the end
> > of
> > > the "queue", and do a point deletion in the sessionStore, which I
> assume
> > > that would be keyed by the sessionId.
> > >
> > > The reason for that is:
> > > 1) RocksDB is a KV-store and it is super efficient in read/write by
> key,
> > > not by iterator
> > > 2) If you have to use iterator, making sure that the iterator only goes
> > > toward the "tail" where all meaningful work items will be is important
> to
> > > achieve fast and efficient operation. Please refer to this blog from
> > > RocksDB team:
> > >
> > >
> >
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
> > >
> > > -Yi
> > >
> > > On Mon, Jun 6, 2016 at 2:25 PM, David Yu <da...@optimizely.com>
> > wrote:
> > >
> > > > We use Samza RocksDB to keep track of our user event sessions. The
> task
> > > > periodically calls window() to update all sessions in the store and
> > purge
> > > > all closed sessions.
> > > >
> > > > We do all of this in the same iterator loop.
> > > >
> > > > Here's how we are doing it:
> > > >
> > > >
> > > > public void window(MessageCollector collector, TaskCoordinator
> > > coordinator)
> > > > throws Exception {
> > > >
> > > > KeyValueIterator<String, Session> it = sessionStore.all();
> > > >
> > > > while (it.hasNext()) {
> > > >
> > > > Entry<String, Session> entry = it.next();
> > > > Session session = entry.getValue();
> > > >
> > > > update(session);
> > > >
> > > > if (session.getStatus() == Status.CLOSED) {
> > > > sessionStore.delete(entry.getKey());
> > > > } else {
> > > > sessionStore.put(entry.getKey(), session);
> > > > }
> > > > }
> > > > }
> > > >
> > > >
> > > > The question is: is this the correct/efficient way to do a
> read+update
> > > for
> > > > RocksDB?
> > > >
> > > > Thanks,
> > > > David
> > > >
> > >
> >
>

Re: Update all values in RocksDB

Posted by Yi Pan <ni...@gmail.com>.
Hi, David,

Generally speaking, iterators will make a snapshot of key space of RocksDB.
Hence, it associates with some memory overhead. More severe performance
issue we saw before is that if you insert and delete tons of sessions in a
short time period, the iterator seek function can be extremely slow, due to
the need to traverse through tons of tombstone (i.e. deleted records)
before hitting the next live record. The suggested "queue" mechanism helps
to avoid the issue, since all deletions happen sequentially in a continuous
block and all insertions also happen sequentially in a continuous block.
This gives greater opportunity for the compaction thread to come in and
cleanup all the tombstone records and make iterator faster, again.

In your use case, if you can make sure that the newly inserted session's
sessionId is *alway* at the tail of your session table, and your session
expiration order is the same as the order determined by the sessionId, that
should work as well.

-Yi

On Mon, Jun 6, 2016 at 3:17 PM, David Yu <da...@optimizely.com> wrote:

> Hi, Yi,
>
> Yes, the sessions are keyed by the sessionId.
>
> In our case, iterating through all OPEN sessions is inevitable, since that
> is precisely where we evaluate (base on timestamp) and close sessions. In
> other words, the closed session queue you suggested cannot be constructed
> without going through all the sessions periodically.
>
> Can you explain (on a higher level) why iteration through the entries can
> be a slow process?
>
> Thanks,
> David
>
> On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, David,
> >
> > I would recommend to keep a separate table of closed sessions as a
> "queue",
> > ordered by the time the session is closed. And in your window method,
> just
> > create an iterator in the "queue" and only make progress toward the end
> of
> > the "queue", and do a point deletion in the sessionStore, which I assume
> > that would be keyed by the sessionId.
> >
> > The reason for that is:
> > 1) RocksDB is a KV-store and it is super efficient in read/write by key,
> > not by iterator
> > 2) If you have to use iterator, making sure that the iterator only goes
> > toward the "tail" where all meaningful work items will be is important to
> > achieve fast and efficient operation. Please refer to this blog from
> > RocksDB team:
> >
> >
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
> >
> > -Yi
> >
> > On Mon, Jun 6, 2016 at 2:25 PM, David Yu <da...@optimizely.com>
> wrote:
> >
> > > We use Samza RocksDB to keep track of our user event sessions. The task
> > > periodically calls window() to update all sessions in the store and
> purge
> > > all closed sessions.
> > >
> > > We do all of this in the same iterator loop.
> > >
> > > Here's how we are doing it:
> > >
> > >
> > > public void window(MessageCollector collector, TaskCoordinator
> > coordinator)
> > > throws Exception {
> > >
> > > KeyValueIterator<String, Session> it = sessionStore.all();
> > >
> > > while (it.hasNext()) {
> > >
> > > Entry<String, Session> entry = it.next();
> > > Session session = entry.getValue();
> > >
> > > update(session);
> > >
> > > if (session.getStatus() == Status.CLOSED) {
> > > sessionStore.delete(entry.getKey());
> > > } else {
> > > sessionStore.put(entry.getKey(), session);
> > > }
> > > }
> > > }
> > >
> > >
> > > The question is: is this the correct/efficient way to do a read+update
> > for
> > > RocksDB?
> > >
> > > Thanks,
> > > David
> > >
> >
>

Re: Update all values in RocksDB

Posted by David Yu <da...@optimizely.com>.
Hi, Yi,

Yes, the sessions are keyed by the sessionId.

In our case, iterating through all OPEN sessions is inevitable, since that
is precisely where we evaluate (base on timestamp) and close sessions. In
other words, the closed session queue you suggested cannot be constructed
without going through all the sessions periodically.

Can you explain (on a higher level) why iteration through the entries can
be a slow process?

Thanks,
David

On Mon, Jun 6, 2016 at 2:34 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, David,
>
> I would recommend to keep a separate table of closed sessions as a "queue",
> ordered by the time the session is closed. And in your window method, just
> create an iterator in the "queue" and only make progress toward the end of
> the "queue", and do a point deletion in the sessionStore, which I assume
> that would be keyed by the sessionId.
>
> The reason for that is:
> 1) RocksDB is a KV-store and it is super efficient in read/write by key,
> not by iterator
> 2) If you have to use iterator, making sure that the iterator only goes
> toward the "tail" where all meaningful work items will be is important to
> achieve fast and efficient operation. Please refer to this blog from
> RocksDB team:
>
> https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB
>
> -Yi
>
> On Mon, Jun 6, 2016 at 2:25 PM, David Yu <da...@optimizely.com> wrote:
>
> > We use Samza RocksDB to keep track of our user event sessions. The task
> > periodically calls window() to update all sessions in the store and purge
> > all closed sessions.
> >
> > We do all of this in the same iterator loop.
> >
> > Here's how we are doing it:
> >
> >
> > public void window(MessageCollector collector, TaskCoordinator
> coordinator)
> > throws Exception {
> >
> > KeyValueIterator<String, Session> it = sessionStore.all();
> >
> > while (it.hasNext()) {
> >
> > Entry<String, Session> entry = it.next();
> > Session session = entry.getValue();
> >
> > update(session);
> >
> > if (session.getStatus() == Status.CLOSED) {
> > sessionStore.delete(entry.getKey());
> > } else {
> > sessionStore.put(entry.getKey(), session);
> > }
> > }
> > }
> >
> >
> > The question is: is this the correct/efficient way to do a read+update
> for
> > RocksDB?
> >
> > Thanks,
> > David
> >
>

Re: Update all values in RocksDB

Posted by Yi Pan <ni...@gmail.com>.
Hi, David,

I would recommend to keep a separate table of closed sessions as a "queue",
ordered by the time the session is closed. And in your window method, just
create an iterator in the "queue" and only make progress toward the end of
the "queue", and do a point deletion in the sessionStore, which I assume
that would be keyed by the sessionId.

The reason for that is:
1) RocksDB is a KV-store and it is super efficient in read/write by key,
not by iterator
2) If you have to use iterator, making sure that the iterator only goes
toward the "tail" where all meaningful work items will be is important to
achieve fast and efficient operation. Please refer to this blog from
RocksDB team:
https://github.com/facebook/rocksdb/wiki/Implement-Queue-Service-Using-RocksDB

-Yi

On Mon, Jun 6, 2016 at 2:25 PM, David Yu <da...@optimizely.com> wrote:

> We use Samza RocksDB to keep track of our user event sessions. The task
> periodically calls window() to update all sessions in the store and purge
> all closed sessions.
>
> We do all of this in the same iterator loop.
>
> Here's how we are doing it:
>
>
> public void window(MessageCollector collector, TaskCoordinator coordinator)
> throws Exception {
>
> KeyValueIterator<String, Session> it = sessionStore.all();
>
> while (it.hasNext()) {
>
> Entry<String, Session> entry = it.next();
> Session session = entry.getValue();
>
> update(session);
>
> if (session.getStatus() == Status.CLOSED) {
> sessionStore.delete(entry.getKey());
> } else {
> sessionStore.put(entry.getKey(), session);
> }
> }
> }
>
>
> The question is: is this the correct/efficient way to do a read+update for
> RocksDB?
>
> Thanks,
> David
>