You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Vincent Marquez <vi...@gmail.com> on 2022/05/11 21:10:34 UTC

Changing the interface in CassandraIO Mapper

I would like to do some additional performance related changes to the
CassandraIO module, but it would necessitate changing the Mapper interface
to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
sure why the Mapper interface specifies the former, as the datastax driver
itself returns a ListenableFuture for any async queries.

How are changes to user facing interfaces handled (however minor they would
be) for Beam?  If this is something that can be done, I'm happy to create a
ticket, but supporting full backwards compatibility might be too much work.


*~Vincent*

Re: Changing the interface in CassandraIO Mapper

Posted by Kenneth Knowles <ke...@apache.org>.
Minor but important correction: Beam does *not* "shade" Guava. That tends
to refer to build-time re-namespacing and/or bundling. Beam does neither of
those things. What Beam has done is to create the equivalent of a totally
independent fork. It has no impact on whether various libraries or IOs use
Guava directly.

Regarding CassandraIO: I believe if the user commits to using CassandraIO
or a Cassandra client library then they already commit to having a
compatible version of Guava in their dependency set. So I think it is fine
to expose it on the API surface. It is not part of the core SDK, so it only
impacts CassandraIO users, who already don't have a choice.

Kenn

On Tue, Jun 14, 2022 at 5:48 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Tue, Jun 14, 2022 at 5:11 PM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>>
>>
>>
>> On Mon, May 16, 2022 at 11:29 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, May 16, 2022 at 12:35 PM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> Adding folks who might have an opinion : @Alexey Romanenko
>>>> <ar...@gmail.com> @Chamikara Jayalath <ch...@google.com>
>>>>
>>>> On Wed, May 11, 2022 at 5:47 PM Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com>
>>>>> wrote:
>>>>>
>>>>>> ListenableFuture has the additional problem that beam shades guava,
>>>>>> so its very unlikely you would be able to put it into the public interface.
>>>>>>
>>>>>>
>>>>> I'm not sure why this would be the case, there are other places that
>>>>> make use of ListenableFuture such as the BigQuery IO, I would just need to
>>>>> use the vendored guava, no?
>>>>>
>>>>
>>> I don't think this is exposed through the public API of BigQueryIO
>>> though.
>>>
>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>>> Can you describe in more detail the changes you want to make and why
>>>>>> they require ListenableFuture for this interface?
>>>>>>
>>>>>
>>>>> Happy to go into detail:
>>>>>
>>>>> Currently writes to Cassandra are executed asynchronous up to 100 per
>>>>> instance of the DoFn (which I believe on most/all runners would be 1 per
>>>>> core).
>>>>>
>>>>> 1. That number should be configurable, this would entirely depend on
>>>>> the size of the Cassandra/Scylla cluster to determine if 100 async queries
>>>>> per core/node of a beam job is sufficient.
>>>>>
>>>>> 2. Once 100 async queries are queued up, the processElement *blocks*
>>>>> until all 100 queries finish.  This isn't efficient and will prevent more
>>>>> queries from being queued up until the slowest one finishes.  We've found
>>>>> it much better to have a steady rate of async queries in flight (to better
>>>>> saturate the cores on the database).   However, to do so would require some
>>>>> sort of semaphore type system in that we need to know when one query
>>>>> finishes that means we can add another.  Hence the need for a
>>>>> ListenableFuture, some mechanism that can signal an onComplete to release a
>>>>> semaphore (or latch or whatever).
>>>>>
>>>>> Does that make sense?  Thoughts/comments/criticism welcome.  Happy to
>>>>> put this up in a design doc if it seems like something worth doing.
>>>>>
>>>>
>>> Does this have to be more complicated than maintaining threadpool to
>>> manage async requests and adding incoming requests to the pool (which will
>>> be processed when the threads become available) ? I don't understand why
>>> you need to block accepting incoming requests till all 100 queries are
>>> finished.
>>>
>>>
>>
>> Apologies that I missed your reply!  The issue isn't that the threads
>> can't process the requests fast enough, the issue is we don't want to send
>> off the requests to the server until the server has finished processing.
>> We're trying to throttle sending too many queries to that particular
>> partition.
>>
>> Make sense?
>>
>
> Yeah, sounds like a reasonable performance improvement to me (minus the
> vendored guave issue Daniel pointed out).
> For completeness I believe this is the location where you are requesting
> to change the interface:
> https://github.com/apache/beam/blob/ac20321008e51c401731895ea934642b4648efd3/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/Mapper.java#L65
>
> It might be possible to make this available as an option (a new
> CassandraIO.withMapperFactoryFn with a new Mapper) to preserve backwards
> compatibility.
>
> Thanks,
> Cham
>
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <
>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>
>>>>>>> I would like to do some additional performance related changes to
>>>>>>> the CassandraIO module, but it would necessitate changing the Mapper
>>>>>>> interface to return ListenableFuture opposed to
>>>>>>> java.util.concurrent.Future.  I'm not sure why the Mapper interface
>>>>>>> specifies the former, as the datastax driver itself returns a
>>>>>>> ListenableFuture for any async queries.
>>>>>>>
>>>>>>> How are changes to user facing interfaces handled (however minor
>>>>>>> they would be) for Beam?  If this is something that can be done, I'm happy
>>>>>>> to create a ticket, but supporting full backwards compatibility might be
>>>>>>> too much work.
>>>>>>>
>>>>>>>
>>>>>>> *~Vincent*
>>>>>>>
>>>>>>

Re: Changing the interface in CassandraIO Mapper

Posted by Chamikara Jayalath <ch...@google.com>.
On Tue, Jun 14, 2022 at 5:11 PM Vincent Marquez <vi...@gmail.com>
wrote:

>
>
>
> On Mon, May 16, 2022 at 11:29 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Mon, May 16, 2022 at 12:35 PM Ahmet Altay <al...@google.com> wrote:
>>
>>> Adding folks who might have an opinion : @Alexey Romanenko
>>> <ar...@gmail.com> @Chamikara Jayalath <ch...@google.com>
>>>
>>> On Wed, May 11, 2022 at 5:47 PM Vincent Marquez <
>>> vincent.marquez@gmail.com> wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> ListenableFuture has the additional problem that beam shades guava, so
>>>>> its very unlikely you would be able to put it into the public interface.
>>>>>
>>>>>
>>>> I'm not sure why this would be the case, there are other places that
>>>> make use of ListenableFuture such as the BigQuery IO, I would just need to
>>>> use the vendored guava, no?
>>>>
>>>
>> I don't think this is exposed through the public API of BigQueryIO though.
>>
>>
>>>
>>>>
>>>>
>>>>
>>>>> Can you describe in more detail the changes you want to make and why
>>>>> they require ListenableFuture for this interface?
>>>>>
>>>>
>>>> Happy to go into detail:
>>>>
>>>> Currently writes to Cassandra are executed asynchronous up to 100 per
>>>> instance of the DoFn (which I believe on most/all runners would be 1 per
>>>> core).
>>>>
>>>> 1. That number should be configurable, this would entirely depend on
>>>> the size of the Cassandra/Scylla cluster to determine if 100 async queries
>>>> per core/node of a beam job is sufficient.
>>>>
>>>> 2. Once 100 async queries are queued up, the processElement *blocks*
>>>> until all 100 queries finish.  This isn't efficient and will prevent more
>>>> queries from being queued up until the slowest one finishes.  We've found
>>>> it much better to have a steady rate of async queries in flight (to better
>>>> saturate the cores on the database).   However, to do so would require some
>>>> sort of semaphore type system in that we need to know when one query
>>>> finishes that means we can add another.  Hence the need for a
>>>> ListenableFuture, some mechanism that can signal an onComplete to release a
>>>> semaphore (or latch or whatever).
>>>>
>>>> Does that make sense?  Thoughts/comments/criticism welcome.  Happy to
>>>> put this up in a design doc if it seems like something worth doing.
>>>>
>>>
>> Does this have to be more complicated than maintaining threadpool to
>> manage async requests and adding incoming requests to the pool (which will
>> be processed when the threads become available) ? I don't understand why
>> you need to block accepting incoming requests till all 100 queries are
>> finished.
>>
>>
>
> Apologies that I missed your reply!  The issue isn't that the threads
> can't process the requests fast enough, the issue is we don't want to send
> off the requests to the server until the server has finished processing.
> We're trying to throttle sending too many queries to that particular
> partition.
>
> Make sense?
>

Yeah, sounds like a reasonable performance improvement to me (minus the
vendored guave issue Daniel pointed out).
For completeness I believe this is the location where you are requesting to
change the interface:
https://github.com/apache/beam/blob/ac20321008e51c401731895ea934642b4648efd3/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/Mapper.java#L65

It might be possible to make this available as an option (a new
CassandraIO.withMapperFactoryFn with a new Mapper) to preserve backwards
compatibility.

Thanks,
Cham


>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>> Thanks,
>> Cham
>>
>>
>>>>
>>>>
>>>>>
>>>>> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>>
>>>>>> I would like to do some additional performance related changes to the
>>>>>> CassandraIO module, but it would necessitate changing the Mapper interface
>>>>>> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
>>>>>> sure why the Mapper interface specifies the former, as the datastax driver
>>>>>> itself returns a ListenableFuture for any async queries.
>>>>>>
>>>>>> How are changes to user facing interfaces handled (however minor they
>>>>>> would be) for Beam?  If this is something that can be done, I'm happy to
>>>>>> create a ticket, but supporting full backwards compatibility might be too
>>>>>> much work.
>>>>>>
>>>>>>
>>>>>> *~Vincent*
>>>>>>
>>>>>

Re: Changing the interface in CassandraIO Mapper

Posted by Vincent Marquez <vi...@gmail.com>.
On Mon, May 16, 2022 at 11:29 PM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Mon, May 16, 2022 at 12:35 PM Ahmet Altay <al...@google.com> wrote:
>
>> Adding folks who might have an opinion : @Alexey Romanenko
>> <ar...@gmail.com> @Chamikara Jayalath <ch...@google.com>
>>
>> On Wed, May 11, 2022 at 5:47 PM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>>
>>>
>>>
>>>
>>> On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> ListenableFuture has the additional problem that beam shades guava, so
>>>> its very unlikely you would be able to put it into the public interface.
>>>>
>>>>
>>> I'm not sure why this would be the case, there are other places that
>>> make use of ListenableFuture such as the BigQuery IO, I would just need to
>>> use the vendored guava, no?
>>>
>>
> I don't think this is exposed through the public API of BigQueryIO though.
>
>
>>
>>>
>>>
>>>
>>>> Can you describe in more detail the changes you want to make and why
>>>> they require ListenableFuture for this interface?
>>>>
>>>
>>> Happy to go into detail:
>>>
>>> Currently writes to Cassandra are executed asynchronous up to 100 per
>>> instance of the DoFn (which I believe on most/all runners would be 1 per
>>> core).
>>>
>>> 1. That number should be configurable, this would entirely depend on the
>>> size of the Cassandra/Scylla cluster to determine if 100 async queries per
>>> core/node of a beam job is sufficient.
>>>
>>> 2. Once 100 async queries are queued up, the processElement *blocks*
>>> until all 100 queries finish.  This isn't efficient and will prevent more
>>> queries from being queued up until the slowest one finishes.  We've found
>>> it much better to have a steady rate of async queries in flight (to better
>>> saturate the cores on the database).   However, to do so would require some
>>> sort of semaphore type system in that we need to know when one query
>>> finishes that means we can add another.  Hence the need for a
>>> ListenableFuture, some mechanism that can signal an onComplete to release a
>>> semaphore (or latch or whatever).
>>>
>>> Does that make sense?  Thoughts/comments/criticism welcome.  Happy to
>>> put this up in a design doc if it seems like something worth doing.
>>>
>>
> Does this have to be more complicated than maintaining threadpool to
> manage async requests and adding incoming requests to the pool (which will
> be processed when the threads become available) ? I don't understand why
> you need to block accepting incoming requests till all 100 queries are
> finished.
>
>

Apologies that I missed your reply!  The issue isn't that the threads can't
process the requests fast enough, the issue is we don't want to send off
the requests to the server until the server has finished processing. We're
trying to throttle sending too many queries to that particular partition.

Make sense?
















> Thanks,
> Cham
>
>
>>>
>>>
>>>>
>>>> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>>
>>>>> I would like to do some additional performance related changes to the
>>>>> CassandraIO module, but it would necessitate changing the Mapper interface
>>>>> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
>>>>> sure why the Mapper interface specifies the former, as the datastax driver
>>>>> itself returns a ListenableFuture for any async queries.
>>>>>
>>>>> How are changes to user facing interfaces handled (however minor they
>>>>> would be) for Beam?  If this is something that can be done, I'm happy to
>>>>> create a ticket, but supporting full backwards compatibility might be too
>>>>> much work.
>>>>>
>>>>>
>>>>> *~Vincent*
>>>>>
>>>>

Re: Changing the interface in CassandraIO Mapper

Posted by Chamikara Jayalath <ch...@google.com>.
On Mon, May 16, 2022 at 12:35 PM Ahmet Altay <al...@google.com> wrote:

> Adding folks who might have an opinion : @Alexey Romanenko
> <ar...@gmail.com> @Chamikara Jayalath <ch...@google.com>
>
> On Wed, May 11, 2022 at 5:47 PM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>>
>>
>>
>> On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> ListenableFuture has the additional problem that beam shades guava, so
>>> its very unlikely you would be able to put it into the public interface.
>>>
>>>
>> I'm not sure why this would be the case, there are other places that make
>> use of ListenableFuture such as the BigQuery IO, I would just need to use
>> the vendored guava, no?
>>
>
I don't think this is exposed through the public API of BigQueryIO though.


>
>>
>>
>>
>>> Can you describe in more detail the changes you want to make and why
>>> they require ListenableFuture for this interface?
>>>
>>
>> Happy to go into detail:
>>
>> Currently writes to Cassandra are executed asynchronous up to 100 per
>> instance of the DoFn (which I believe on most/all runners would be 1 per
>> core).
>>
>> 1. That number should be configurable, this would entirely depend on the
>> size of the Cassandra/Scylla cluster to determine if 100 async queries per
>> core/node of a beam job is sufficient.
>>
>> 2. Once 100 async queries are queued up, the processElement *blocks*
>> until all 100 queries finish.  This isn't efficient and will prevent more
>> queries from being queued up until the slowest one finishes.  We've found
>> it much better to have a steady rate of async queries in flight (to better
>> saturate the cores on the database).   However, to do so would require some
>> sort of semaphore type system in that we need to know when one query
>> finishes that means we can add another.  Hence the need for a
>> ListenableFuture, some mechanism that can signal an onComplete to release a
>> semaphore (or latch or whatever).
>>
>> Does that make sense?  Thoughts/comments/criticism welcome.  Happy to put
>> this up in a design doc if it seems like something worth doing.
>>
>
Does this have to be more complicated than maintaining threadpool to manage
async requests and adding incoming requests to the pool (which will be
processed when the threads become available) ? I don't understand why you
need to block accepting incoming requests till all 100 queries are
finished.

Thanks,
Cham


>>
>>
>>>
>>> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <
>>> vincent.marquez@gmail.com> wrote:
>>>
>>>> I would like to do some additional performance related changes to the
>>>> CassandraIO module, but it would necessitate changing the Mapper interface
>>>> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
>>>> sure why the Mapper interface specifies the former, as the datastax driver
>>>> itself returns a ListenableFuture for any async queries.
>>>>
>>>> How are changes to user facing interfaces handled (however minor they
>>>> would be) for Beam?  If this is something that can be done, I'm happy to
>>>> create a ticket, but supporting full backwards compatibility might be too
>>>> much work.
>>>>
>>>>
>>>> *~Vincent*
>>>>
>>>

Re: Changing the interface in CassandraIO Mapper

Posted by Ahmet Altay <al...@google.com>.
Adding folks who might have an opinion : @Alexey Romanenko
<ar...@gmail.com> @Chamikara Jayalath <ch...@google.com>

On Wed, May 11, 2022 at 5:47 PM Vincent Marquez <vi...@gmail.com>
wrote:

>
>
>
> On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> ListenableFuture has the additional problem that beam shades guava, so
>> its very unlikely you would be able to put it into the public interface.
>>
>>
> I'm not sure why this would be the case, there are other places that make
> use of ListenableFuture such as the BigQuery IO, I would just need to use
> the vendored guava, no?
>
>
>
>
>> Can you describe in more detail the changes you want to make and why they
>> require ListenableFuture for this interface?
>>
>
> Happy to go into detail:
>
> Currently writes to Cassandra are executed asynchronous up to 100 per
> instance of the DoFn (which I believe on most/all runners would be 1 per
> core).
>
> 1. That number should be configurable, this would entirely depend on the
> size of the Cassandra/Scylla cluster to determine if 100 async queries per
> core/node of a beam job is sufficient.
>
> 2. Once 100 async queries are queued up, the processElement *blocks* until
> all 100 queries finish.  This isn't efficient and will prevent more queries
> from being queued up until the slowest one finishes.  We've found it much
> better to have a steady rate of async queries in flight (to better saturate
> the cores on the database).   However, to do so would require some sort of
> semaphore type system in that we need to know when one query finishes that
> means we can add another.  Hence the need for a ListenableFuture, some
> mechanism that can signal an onComplete to release a semaphore (or latch or
> whatever).
>
> Does that make sense?  Thoughts/comments/criticism welcome.  Happy to put
> this up in a design doc if it seems like something worth doing.
>
>
>
>>
>> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <
>> vincent.marquez@gmail.com> wrote:
>>
>>> I would like to do some additional performance related changes to the
>>> CassandraIO module, but it would necessitate changing the Mapper interface
>>> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
>>> sure why the Mapper interface specifies the former, as the datastax driver
>>> itself returns a ListenableFuture for any async queries.
>>>
>>> How are changes to user facing interfaces handled (however minor they
>>> would be) for Beam?  If this is something that can be done, I'm happy to
>>> create a ticket, but supporting full backwards compatibility might be too
>>> much work.
>>>
>>>
>>> *~Vincent*
>>>
>>

Re: Changing the interface in CassandraIO Mapper

Posted by Vincent Marquez <vi...@gmail.com>.
On Wed, May 11, 2022 at 3:12 PM Daniel Collins <dp...@google.com> wrote:

> ListenableFuture has the additional problem that beam shades guava, so its
> very unlikely you would be able to put it into the public interface.
>
>
I'm not sure why this would be the case, there are other places that make
use of ListenableFuture such as the BigQuery IO, I would just need to use
the vendored guava, no?




> Can you describe in more detail the changes you want to make and why they
> require ListenableFuture for this interface?
>

Happy to go into detail:

Currently writes to Cassandra are executed asynchronous up to 100 per
instance of the DoFn (which I believe on most/all runners would be 1 per
core).

1. That number should be configurable, this would entirely depend on the
size of the Cassandra/Scylla cluster to determine if 100 async queries per
core/node of a beam job is sufficient.

2. Once 100 async queries are queued up, the processElement *blocks* until
all 100 queries finish.  This isn't efficient and will prevent more queries
from being queued up until the slowest one finishes.  We've found it much
better to have a steady rate of async queries in flight (to better saturate
the cores on the database).   However, to do so would require some sort of
semaphore type system in that we need to know when one query finishes that
means we can add another.  Hence the need for a ListenableFuture, some
mechanism that can signal an onComplete to release a semaphore (or latch or
whatever).

Does that make sense?  Thoughts/comments/criticism welcome.  Happy to put
this up in a design doc if it seems like something worth doing.



>
> On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>> I would like to do some additional performance related changes to the
>> CassandraIO module, but it would necessitate changing the Mapper interface
>> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
>> sure why the Mapper interface specifies the former, as the datastax driver
>> itself returns a ListenableFuture for any async queries.
>>
>> How are changes to user facing interfaces handled (however minor they
>> would be) for Beam?  If this is something that can be done, I'm happy to
>> create a ticket, but supporting full backwards compatibility might be too
>> much work.
>>
>>
>> *~Vincent*
>>
>

Re: Changing the interface in CassandraIO Mapper

Posted by Daniel Collins <dp...@google.com>.
ListenableFuture has the additional problem that beam shades guava, so its
very unlikely you would be able to put it into the public interface.

Can you describe in more detail the changes you want to make and why they
require ListenableFuture for this interface?

On Wed, May 11, 2022 at 5:11 PM Vincent Marquez <vi...@gmail.com>
wrote:

> I would like to do some additional performance related changes to the
> CassandraIO module, but it would necessitate changing the Mapper interface
> to return ListenableFuture opposed to java.util.concurrent.Future.  I'm not
> sure why the Mapper interface specifies the former, as the datastax driver
> itself returns a ListenableFuture for any async queries.
>
> How are changes to user facing interfaces handled (however minor they
> would be) for Beam?  If this is something that can be done, I'm happy to
> create a ticket, but supporting full backwards compatibility might be too
> much work.
>
>
> *~Vincent*
>