You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Steve Katz <st...@nexage.com> on 2014/03/01 04:21:09 UTC

Flume RpcClient multithreading

I'm trying to understand the correct way to use the Flume RpcClient in a multithreaded application. Information I have found so far indicates that the components are thread safe, but the example in the Flume documentation clouds the issue when it comes to error handling. This code:
public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
  }
If more than one thread calls this method, and the exception is thrown, then there will be a problem as multiple threads try to recreate the client in the exception handler.
Is the intent of the SDK that it should only be used by a single thread? Should this method be synchronized, as it appears to be in the log4jappender that is part of the Flume source? Should I put this code in its own worker and pass it events via a queue?
Does anyone have an example of RpcClient being used by more then one thread (included the error condition)?
Would I be better off using the "embedded agent"? Is that multithread friendly?
TIA,
--skatz


Re: Flume RpcClient multithreading

Posted by "Bhaskar V. Karambelkar" <bh...@gmail.com>.
Hi Steve,
We use flume SDK's RCP Client in one of our App. The App is built in scala
and uses the Akka Actor System to deal with concurrent access to the Rpc
Client.
Akka Actor System API is available in Java as well and I encourage you to
take a look at it for concurrency, as opposed to managing thread safety on
your own.
Your example is a perfect use case for hiding the failure handling behind
an Akka Actor.

What we've done is
-- A base Actor which recieves messages, and simply passes these messages
to a RoundRobinRouter.
-- The RoundRobinRouter merely forwards the message to one of the Flume RPC
Client Actor in a round robin format
-- Each flume Client RPC actor is responsible for managing it's own state
w/o having to worry about thread safety and upon message receiption, passes
the message to the corresponding flume source. (So 1 FlumeRPCClientActor
per flume source)

The Actor system ensure that only one message gets passed to the
FlumeRPCClientActor, and so client.close followed by client.open can be
safely called in the Actor's receive method.

thanks
Bhaskar


On Fri, Feb 28, 2014 at 10:21 PM, Steve Katz <st...@nexage.com> wrote:

> I'm trying to understand the correct way to use the Flume RpcClient in a
> multithreaded application. Information I have found so far indicates that
> the components are thread safe, but the example in the Flume documentation
> clouds the issue when it comes to error handling. This code:
>
> public void sendDataToFlume(String data) {
>
>     // Create a Flume Event object that encapsulates the sample data
>
>     Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
>
>
>
>     // Send the event
>
>     try {
>
>       client.append(event);
>
>     } catch (EventDeliveryException e) {
>
>       // clean up and recreate the client
>
>       client.close();
>
>       client = null;
>
>       client = RpcClientFactory.getDefaultInstance(hostname, port);
>
>       // Use the following method to create a thrift client (instead of
> the above line):
>
>       // this.client = RpcClientFactory.getThriftInstance(hostname, port);
>
>     }
>
>   }
>
> If more than one thread calls this method, and the exception is thrown,
> then there will be a problem as multiple threads try to recreate the client
> in the exception handler.
>
> Is the intent of the SDK that it should only be used by a single thread?
> Should this method be synchronized, as it appears to be in the
> log4jappender that is part of the Flume source? Should I put this code in
> its own worker and pass it events via a queue?
>
> Does anyone have an example of RpcClient being used by more then one
> thread (included the error condition)?
>
> Would I be better off using the "embedded agent"? Is that multithread
> friendly?
>
> TIA,
>
> --skatz
>
>
>