You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kudu.apache.org by José Ribeiro <jo...@hotmail.com> on 2018/04/09 10:10:47 UTC

AsyncKudu

Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any valuable examples with asynchronous kudu client, can you please help me in this example?


I want to use AsyncKudu to write in the kudu database. The steps i want to use are:

  1.  verify that the table exists;
  2.  if exists, open table;
  3.  write on the table;


Can someone explain me how to do this with AsyncKudu? I'm having trouble in understanding the Deferred class that is always returned in every asyncKudu method. Thank you.

Re: AsyncKudu

Posted by Dan Burkert <da...@apache.org>.
On Tue, Apr 10, 2018 at 1:26 AM, José Ribeiro <jose.enes.ribeiro@hotmail.com
> wrote:

> One thing, if I use the thread-safe queues, it's to use the sync or async
> client? If a parallel thread is listening to the queue, I can use the sync
> client, right?
>

Yep, that's correct.  No need for the async client in that scenario.

- Dan


>
> -José
> ------------------------------
> *De:* Dan Burkert <da...@apache.org>
> *Enviado:* 9 de abril de 2018 18:32:43
>
> *Para:* user@kudu.apache.org
> *Assunto:* Re: AsyncKudu
>
> Hi José,
>
> I would consider doing this a little bit differently.  The only
> circumstances I'm aware of where it's necessary to use the async session is
> when you are attempting to write to Kudu from another async context where
> you can't block, and you need to handle the results of the write.  That
> doesn't seem to necessarily be the case here, though.  Instead, I'd
> consider doing the following:
>
> - If possible, re-use Session instances and configure
> AUTO_FLUSH_BACKGROUND.  This will make the throughput significantly better,
> since it will internally batch up writes before flushing.  With
> AUTO_FLUSH_SYNC it sends an RPC for every upsert, which is going to be much
> slower.  Note, however, that Session is not a threadsafe class.
>
> - If you absolutely have to prevent blocking on your main thread, then
> create a worker-thread which reads the rows to upsert from a queue and
> writes them to Kudu.  Have the main thread simply add the appropriate rows
> to the queue. There are a number of good thread-safe queues in the Java
> standard library that are appropriate.
>
> Doing the first, or the first and second should be easier than messing
> with the async client, as well as more performant.
>
> - Dan
>
> On Mon, Apr 9, 2018 at 10:16 AM, José Ribeiro <
> jose.enes.ribeiro@hotmail.com> wrote:
>
> Thank you for the reply. But I got it to write on Kudu.
>
> Still, I don't know why, but I get this error:
>
>
> "java.lang.AssertionError: Tried to resume the execution of
> Deferred@2021942318(state=DONE, result=true, callback=<none>,
> errback=<none>)) although it's not in state=PAUSED.  This occurred after
> the completion of Deferred@1753159143(state=RUNNING, result=Written with
> success., callback=<none>, errback=<none>) which was originally returned by
> callback=com.foo.kudu.KuduDAOImpl$1@3e69008f@1047068815"
>
> The only way I discover to reach kudu was like this:
>
>  *public void asyncWrite(String tableName, Foo foo) *{
>         Deferred<Deferred<Boolean>> res = new Deferred<>();
>
>         res.addCallback(new Callback<Deferred<Deferred<String>>,
> Deferred<Boolean>>() {
>             @Override
>             *public Deferred<Deferred<String>> call(Deferred<Boolean>
> booleanDeferred)*{
>                 return booleanDeferred.addCallbacks(new
> Callback<Deferred<String>, Boolean>() {
>                     @Override
>                     *public Deferred<String> call(Boolean aBoolean)
> throws Exception* {
>                         if (aBoolean) {
>                             Deferred<String> stringDeferred =
> kuduClient.openTable(tableName)
>                                     .addCallbacks(new Callback<String,
> KuduTable>() {
>                                         @Override
>                                         *public String call(KuduTable
> kuduTable) throws KuduException* {
>                                             AsyncKuduSession session =
> kuduClient.newSession();
>                                             session.setFlushMode(SessionCo
> nfiguration.FlushMode.AUTO_FLUSH_SYNC);
>
>                                             Upsert upst =
> kuduTable.newUpsert();
>                                             PartialRow row =
> upst.getRow();
>                                             row.addString("house_id",
> foo.getId());
>
>                                             session.apply(upst);
>                                             session.close();
>                                             LOGGER.info("Written with
> success.");
>                                             return "Written with
> success.";
>                                         }
>                                     }, (Callback<Object, KuduException>)
> e -> {
>                                         LOGGER.error(e.getMessage());
>                                         return e;
>                                     });
>                             return stringDeferred;
>                         } else {
>                             throw new Exception("Table doesn't exists");
>                         }
>
>                     }
>                 }, (Callback<Object, Exception>) e -> {
>                     System.out.println(e.getMessage());
>                     return e;
>                 });
>             }
>         });
>
>         //executing the callback
>         res.callback(kuduClient.tableExists(tableName));
>     }
>
>
> A little of the background of my project.  The clients read and write on
> other Database, and when they write something, the same information is sent
> to Kudu. I don't want to block the client with the Kudu part, because the
> client only needs the other DataBase to work.
>
> José Ribeiro
>
> ------------------------------
> *De:* Dan Burkert <da...@apache.org>
> *Enviado:* 9 de abril de 2018 17:21
> *Para:* user@kudu.apache.org
> *Assunto:* Re: AsyncKudu
>
> Hi José,
>
> The Deferred class is indeed pretty difficult to come to grips with, which
> is why we don't really recommend the async API for most use cases.  I've
> personally found the Deferred
> <https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class
> docs to be pretty useful when getting up to speed with it.  Using the
> AsyncKuduSession is particularly tricky because it doesn't expose
> backpressure in an intuitive way, and it becomes very easy to accidentally
> have a huge number of writes buffered in the client heap (the sync client
> solves this by simply blocking so more rows can't be produced).
>
> kudu-ts
> <https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java>
> has a little bit of code that uses the async client to open and scan
> tables, so it may be helpful to look over that.  It doesn't use the async
> session, I think because it was easier just to use the sync client for
> writing.
>
> - Dan
>
> On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro <
> jose.enes.ribeiro@hotmail.com> wrote:
>
> Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any
> valuable examples with asynchronous kudu client, can you please help me in
> this example?
>
>
> I want to use AsyncKudu to write in the kudu database. The steps i want to
> use are:
>
>    1. verify that the table exists;
>    2. if exists, open table;
>    3. write on the table;
>
>
> Can someone explain me how to do this with AsyncKudu? I'm having trouble
> in understanding the Deferred class that is always returned in every
> asyncKudu method. Thank you.
>
>
>
>

Re: AsyncKudu

Posted by José Ribeiro <jo...@hotmail.com>.
Hi Dan, thank you for the explanation. I'll try to do this.

One thing, if I use the thread-safe queues, it's to use the sync or async client? If a parallel thread is listening to the queue, I can use the sync client, right?


-José

________________________________
De: Dan Burkert <da...@apache.org>
Enviado: 9 de abril de 2018 18:32:43
Para: user@kudu.apache.org
Assunto: Re: AsyncKudu

Hi José,

I would consider doing this a little bit differently.  The only circumstances I'm aware of where it's necessary to use the async session is when you are attempting to write to Kudu from another async context where you can't block, and you need to handle the results of the write.  That doesn't seem to necessarily be the case here, though.  Instead, I'd consider doing the following:

- If possible, re-use Session instances and configure AUTO_FLUSH_BACKGROUND.  This will make the throughput significantly better, since it will internally batch up writes before flushing.  With AUTO_FLUSH_SYNC it sends an RPC for every upsert, which is going to be much slower.  Note, however, that Session is not a threadsafe class.

- If you absolutely have to prevent blocking on your main thread, then create a worker-thread which reads the rows to upsert from a queue and writes them to Kudu.  Have the main thread simply add the appropriate rows to the queue. There are a number of good thread-safe queues in the Java standard library that are appropriate.

Doing the first, or the first and second should be easier than messing with the async client, as well as more performant.

- Dan

On Mon, Apr 9, 2018 at 10:16 AM, José Ribeiro <jo...@hotmail.com>> wrote:

Thank you for the reply. But I got it to write on Kudu.

Still, I don't know why, but I get this error:


"java.lang.AssertionError: Tried to resume the execution of Deferred@2021942318(state=DONE, result=true, callback=<none>, errback=<none>)) although it's not in state=PAUSED.  This occurred after the completion of Deferred@1753159143(state=RUNNING, result=Written with success., callback=<none>, errback=<none>) which was originally returned by callback=com.foo.kudu.KuduDAOImpl$1@3e69008f@1047068815"

The only way I discover to reach kudu was like this:

 public void asyncWrite(String tableName, Foo foo) {
        Deferred<Deferred<Boolean>> res = new Deferred<>();

        res.addCallback(new Callback<Deferred<Deferred<String>>, Deferred<Boolean>>() {
            @Override
            public Deferred<Deferred<String>> call(Deferred<Boolean> booleanDeferred){
                return booleanDeferred.addCallbacks(new Callback<Deferred<String>, Boolean>() {
                    @Override
                    public Deferred<String> call(Boolean aBoolean) throws Exception {
                        if (aBoolean) {
                            Deferred<String> stringDeferred = kuduClient.openTable(tableName)
                                    .addCallbacks(new Callback<String, KuduTable>() {
                                        @Override
                                        public String call(KuduTable kuduTable) throws KuduException {
                                            AsyncKuduSession session = kuduClient.newSession();
                                            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

                                            Upsert upst = kuduTable.newUpsert();
                                            PartialRow row = upst.getRow();
                                            row.addString("house_id", foo.getId());

                                            session.apply(upst);
                                            session.close();
                                            LOGGER.info("Written with success.");
                                            return "Written with success.";
                                        }
                                    }, (Callback<Object, KuduException>) e -> {
                                        LOGGER.error(e.getMessage());
                                        return e;
                                    });
                            return stringDeferred;
                        } else {
                            throw new Exception("Table doesn't exists");
                        }

                    }
                }, (Callback<Object, Exception>) e -> {
                    System.out.println(e.getMessage());
                    return e;
                });
            }
        });

        //executing the callback
        res.callback(kuduClient.tableExists(tableName));
    }


A little of the background of my project.  The clients read and write on other Database, and when they write something, the same information is sent to Kudu. I don't want to block the client with the Kudu part, because the client only needs the other DataBase to work.

José Ribeiro

________________________________
De: Dan Burkert <da...@apache.org>>
Enviado: 9 de abril de 2018 17:21
Para: user@kudu.apache.org<ma...@kudu.apache.org>
Assunto: Re: AsyncKudu

Hi José,

The Deferred class is indeed pretty difficult to come to grips with, which is why we don't really recommend the async API for most use cases.  I've personally found the Deferred<https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class docs to be pretty useful when getting up to speed with it.  Using the AsyncKuduSession is particularly tricky because it doesn't expose backpressure in an intuitive way, and it becomes very easy to accidentally have a huge number of writes buffered in the client heap (the sync client solves this by simply blocking so more rows can't be produced).

kudu-ts<https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java> has a little bit of code that uses the async client to open and scan tables, so it may be helpful to look over that.  It doesn't use the async session, I think because it was easier just to use the sync client for writing.

- Dan

On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro <jo...@hotmail.com>> wrote:

Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any valuable examples with asynchronous kudu client, can you please help me in this example?


I want to use AsyncKudu to write in the kudu database. The steps i want to use are:

  1.  verify that the table exists;
  2.  if exists, open table;
  3.  write on the table;


Can someone explain me how to do this with AsyncKudu? I'm having trouble in understanding the Deferred class that is always returned in every asyncKudu method. Thank you.



Re: AsyncKudu

Posted by Dan Burkert <da...@apache.org>.
Hi José,

I would consider doing this a little bit differently.  The only
circumstances I'm aware of where it's necessary to use the async session is
when you are attempting to write to Kudu from another async context where
you can't block, and you need to handle the results of the write.  That
doesn't seem to necessarily be the case here, though.  Instead, I'd
consider doing the following:

- If possible, re-use Session instances and configure
AUTO_FLUSH_BACKGROUND.  This will make the throughput significantly better,
since it will internally batch up writes before flushing.  With
AUTO_FLUSH_SYNC it sends an RPC for every upsert, which is going to be much
slower.  Note, however, that Session is not a threadsafe class.

- If you absolutely have to prevent blocking on your main thread, then
create a worker-thread which reads the rows to upsert from a queue and
writes them to Kudu.  Have the main thread simply add the appropriate rows
to the queue. There are a number of good thread-safe queues in the Java
standard library that are appropriate.

Doing the first, or the first and second should be easier than messing with
the async client, as well as more performant.

- Dan

On Mon, Apr 9, 2018 at 10:16 AM, José Ribeiro <jose.enes.ribeiro@hotmail.com
> wrote:

> Thank you for the reply. But I got it to write on Kudu.
>
> Still, I don't know why, but I get this error:
>
>
> "java.lang.AssertionError: Tried to resume the execution of
> Deferred@2021942318(state=DONE, result=true, callback=<none>,
> errback=<none>)) although it's not in state=PAUSED.  This occurred after
> the completion of Deferred@1753159143(state=RUNNING, result=Written with
> success., callback=<none>, errback=<none>) which was originally returned by
> callback=com.foo.kudu.KuduDAOImpl$1@3e69008f@1047068815"
>
> The only way I discover to reach kudu was like this:
>
>  *public void asyncWrite(String tableName, Foo foo) *{
>         Deferred<Deferred<Boolean>> res = new Deferred<>();
>
>         res.addCallback(new Callback<Deferred<Deferred<String>>,
> Deferred<Boolean>>() {
>             @Override
>             *public Deferred<Deferred<String>> call(Deferred<Boolean>
> booleanDeferred)*{
>                 return booleanDeferred.addCallbacks(new
> Callback<Deferred<String>, Boolean>() {
>                     @Override
>                     *public Deferred<String> call(Boolean aBoolean)
> throws Exception* {
>                         if (aBoolean) {
>                             Deferred<String> stringDeferred =
> kuduClient.openTable(tableName)
>                                     .addCallbacks(new Callback<String,
> KuduTable>() {
>                                         @Override
>                                         *public String call(KuduTable
> kuduTable) throws KuduException* {
>                                             AsyncKuduSession session =
> kuduClient.newSession();
>                                             session.setFlushMode(SessionCo
> nfiguration.FlushMode.AUTO_FLUSH_SYNC);
>
>                                             Upsert upst =
> kuduTable.newUpsert();
>                                             PartialRow row =
> upst.getRow();
>                                             row.addString("house_id",
> foo.getId());
>
>                                             session.apply(upst);
>                                             session.close();
>                                             LOGGER.info("Written with
> success.");
>                                             return "Written with
> success.";
>                                         }
>                                     }, (Callback<Object, KuduException>)
> e -> {
>                                         LOGGER.error(e.getMessage());
>                                         return e;
>                                     });
>                             return stringDeferred;
>                         } else {
>                             throw new Exception("Table doesn't exists");
>                         }
>
>                     }
>                 }, (Callback<Object, Exception>) e -> {
>                     System.out.println(e.getMessage());
>                     return e;
>                 });
>             }
>         });
>
>         //executing the callback
>         res.callback(kuduClient.tableExists(tableName));
>     }
>
>
> A little of the background of my project.  The clients read and write on
> other Database, and when they write something, the same information is sent
> to Kudu. I don't want to block the client with the Kudu part, because the
> client only needs the other DataBase to work.
>
> José Ribeiro
>
> ------------------------------
> *De:* Dan Burkert <da...@apache.org>
> *Enviado:* 9 de abril de 2018 17:21
> *Para:* user@kudu.apache.org
> *Assunto:* Re: AsyncKudu
>
> Hi José,
>
> The Deferred class is indeed pretty difficult to come to grips with, which
> is why we don't really recommend the async API for most use cases.  I've
> personally found the Deferred
> <https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class
> docs to be pretty useful when getting up to speed with it.  Using the
> AsyncKuduSession is particularly tricky because it doesn't expose
> backpressure in an intuitive way, and it becomes very easy to accidentally
> have a huge number of writes buffered in the client heap (the sync client
> solves this by simply blocking so more rows can't be produced).
>
> kudu-ts
> <https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java>
> has a little bit of code that uses the async client to open and scan
> tables, so it may be helpful to look over that.  It doesn't use the async
> session, I think because it was easier just to use the sync client for
> writing.
>
> - Dan
>
> On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro <
> jose.enes.ribeiro@hotmail.com> wrote:
>
> Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any
> valuable examples with asynchronous kudu client, can you please help me in
> this example?
>
>
> I want to use AsyncKudu to write in the kudu database. The steps i want to
> use are:
>
>    1. verify that the table exists;
>    2. if exists, open table;
>    3. write on the table;
>
>
> Can someone explain me how to do this with AsyncKudu? I'm having trouble
> in understanding the Deferred class that is always returned in every
> asyncKudu method. Thank you.
>
>
>

Re: AsyncKudu

Posted by José Ribeiro <jo...@hotmail.com>.
Thank you for the reply. But I got it to write on Kudu.

Still, I don't know why, but I get this error:


"java.lang.AssertionError: Tried to resume the execution of Deferred@2021942318(state=DONE, result=true, callback=<none>, errback=<none>)) although it's not in state=PAUSED.  This occurred after the completion of Deferred@1753159143(state=RUNNING, result=Written with success., callback=<none>, errback=<none>) which was originally returned by callback=com.foo.kudu.KuduDAOImpl$1@3e69008f@1047068815"

The only way I discover to reach kudu was like this:

 public void asyncWrite(String tableName, Foo foo) {
        Deferred<Deferred<Boolean>> res = new Deferred<>();

        res.addCallback(new Callback<Deferred<Deferred<String>>, Deferred<Boolean>>() {
            @Override
            public Deferred<Deferred<String>> call(Deferred<Boolean> booleanDeferred){
                return booleanDeferred.addCallbacks(new Callback<Deferred<String>, Boolean>() {
                    @Override
                    public Deferred<String> call(Boolean aBoolean) throws Exception {
                        if (aBoolean) {
                            Deferred<String> stringDeferred = kuduClient.openTable(tableName)
                                    .addCallbacks(new Callback<String, KuduTable>() {
                                        @Override
                                        public String call(KuduTable kuduTable) throws KuduException {
                                            AsyncKuduSession session = kuduClient.newSession();
                                            session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);

                                            Upsert upst = kuduTable.newUpsert();
                                            PartialRow row = upst.getRow();
                                            row.addString("house_id", foo.getId());

                                            session.apply(upst);
                                            session.close();
                                            LOGGER.info("Written with success.");
                                            return "Written with success.";
                                        }
                                    }, (Callback<Object, KuduException>) e -> {
                                        LOGGER.error(e.getMessage());
                                        return e;
                                    });
                            return stringDeferred;
                        } else {
                            throw new Exception("Table doesn't exists");
                        }

                    }
                }, (Callback<Object, Exception>) e -> {
                    System.out.println(e.getMessage());
                    return e;
                });
            }
        });

        //executing the callback
        res.callback(kuduClient.tableExists(tableName));
    }


A little of the background of my project.  The clients read and write on other Database, and when they write something, the same information is sent to Kudu. I don't want to block the client with the Kudu part, because the client only needs the other DataBase to work.

José Ribeiro

________________________________
De: Dan Burkert <da...@apache.org>
Enviado: 9 de abril de 2018 17:21
Para: user@kudu.apache.org
Assunto: Re: AsyncKudu

Hi José,

The Deferred class is indeed pretty difficult to come to grips with, which is why we don't really recommend the async API for most use cases.  I've personally found the Deferred<https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class docs to be pretty useful when getting up to speed with it.  Using the AsyncKuduSession is particularly tricky because it doesn't expose backpressure in an intuitive way, and it becomes very easy to accidentally have a huge number of writes buffered in the client heap (the sync client solves this by simply blocking so more rows can't be produced).

kudu-ts<https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java> has a little bit of code that uses the async client to open and scan tables, so it may be helpful to look over that.  It doesn't use the async session, I think because it was easier just to use the sync client for writing.

- Dan

On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro <jo...@hotmail.com>> wrote:

Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any valuable examples with asynchronous kudu client, can you please help me in this example?


I want to use AsyncKudu to write in the kudu database. The steps i want to use are:

  1.  verify that the table exists;
  2.  if exists, open table;
  3.  write on the table;


Can someone explain me how to do this with AsyncKudu? I'm having trouble in understanding the Deferred class that is always returned in every asyncKudu method. Thank you.


Re: AsyncKudu

Posted by Dan Burkert <da...@apache.org>.
Hi José,

The Deferred class is indeed pretty difficult to come to grips with, which
is why we don't really recommend the async API for most use cases.  I've
personally found the Deferred
<https://github.com/OpenTSDB/async/blob/master/src/Deferred.java> class
docs to be pretty useful when getting up to speed with it.  Using the
AsyncKuduSession is particularly tricky because it doesn't expose
backpressure in an intuitive way, and it becomes very easy to accidentally
have a huge number of writes buffered in the client heap (the sync client
solves this by simply blocking so more rows can't be produced).

kudu-ts
<https://github.com/danburkert/kudu-ts/blob/master/core/src/main/java/org/kududb/ts/core/KuduTS.java>
has a little bit of code that uses the async client to open and scan
tables, so it may be helpful to look over that.  It doesn't use the async
session, I think because it was easier just to use the sync client for
writing.

- Dan

On Mon, Apr 9, 2018 at 3:10 AM, José Ribeiro <jo...@hotmail.com>
wrote:

> Hello, I'm having difficulties in the AsyncKudu API. Since there isn't any
> valuable examples with asynchronous kudu client, can you please help me in
> this example?
>
>
> I want to use AsyncKudu to write in the kudu database. The steps i want to
> use are:
>
>    1. verify that the table exists;
>    2. if exists, open table;
>    3. write on the table;
>
>
> Can someone explain me how to do this with AsyncKudu? I'm having trouble
> in understanding the Deferred class that is always returned in every
> asyncKudu method. Thank you.
>