You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@thrift.apache.org by chang liu <li...@gmail.com> on 2011/08/29 06:12:05 UTC
problem on implementing a thread safe client in thrift
Hi,
By default, a thrift client can not be used by multi-threads. So I extend
the generated code and implement a thread safe version. the service
defination:
service TestRPC {
binary search(1:binary param)
}
here is my extended client code:
public class ThreadSafeTestRPCClient extends TestRPC.Client {
AtomicInteger idGenerator = new AtomicInteger(1);
Connection conn;
public TestRPCClient(TProtocol prot) {
super(prot);
conn = new Connection(prot);
conn.start();
}
public ByteBuffer search(ByteBuffer param) throws
org.apache.thrift.TException {
int seqid = idGenerator.getAndIncrement();
String func = "search";
search_args args = new search_args();
args.setParam(param);
search_result result = new search_result();
Call call = new Call(seqid, func, args, result);
conn.sendParam(call);
call.waitComplete();
return ((search_result)call.getResult()).success;
}
}
class Connection extends Thread {
org.apache.thrift.protocol.TProtocol iprot_;
org.apache.thrift.protocol.TProtocol oprot_;
protected ReentrantLock sendLock = new ReentrantLock();
Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
public Connection(org.apache.thrift.protocol.TProtocol prot) {
this(prot, prot);
}
public Connection(org.apache.thrift.protocol.TProtocol iprot,
org.apache.thrift.protocol.TProtocol oprot) {
this.iprot_ = iprot;
this.oprot_ = oprot;
}
public void sendParam(Call call) throws TException {
calls.put(call.getSeqid(), call);
sendLock.lock();
try {
oprot_.writeMessageBegin(new
org.apache.thrift.protocol.TMessage(call.getFunc(),
org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
call.getArgs().write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
} finally {
sendLock.unlock();
}
}
public void run() {
try {
while (true) {
org.apache.thrift.protocol.TMessage msg =
iprot_.readMessageBegin();
if (msg.type ==
org.apache.thrift.protocol.TMessageType.EXCEPTION) {
org.apache.thrift.TApplicationException x =
org.apache.thrift.TApplicationException.read(iprot_);
iprot_.readMessageEnd();
throw x;
}
int seqid = msg.seqid;
Call call = calls.remove(seqid);
call.getResult().read(iprot_);
iprot_.readMessageEnd();
call.complete();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Call {
private int seqid;
private String func;
private TBase args;
private TBase result;
private ReentrantLock lock = new ReentrantLock();
private boolean complete = false;
private Condition completeCond = lock.newCondition();
public Call(int seqid, String func, TBase args, TBase result) {
this.seqid = seqid;
this.func = func;
this.args = args;
this.result = result;
}
public int getSeqid() {
return seqid;
}
public String getFunc() {
return func;
}
public TBase getArgs() {
return args;
}
public TBase getResult() {
return result;
}
void waitComplete() {
lock.lock();
try {
if (!complete) {
completeCond.await(5000, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
void complete() {
lock.lock();
try {
complete = true;
completeCond.signal();
} finally {
lock.unlock();
}
}
}
But when I use this code in multi-threads, I get strange error as below:
org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
at
org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
at thrift.Connection.run(TestRPCClient.java:77)
does anyone know what the problem is? how to do recovery?
Any advice will be appreciated.
Re: problem on implementing a thread safe client in thrift
Posted by chang liu <li...@gmail.com>.
Actually, I have tried this idea. I've built a thread safe connection pool
by myself, but found the performace(I mean throughput) of this solution is
worse than that of a thread safe client with multiplex connection. The
reason I guess may be the connection from the pool is not fully utilized
during args writtern to result returned. Appreciate any comment.
On Tue, Sep 27, 2011 at 7:22 AM, Adam Fisk <a...@littleshoot.org> wrote:
> I would actually recommend building a thread safe connection pool
> where connections are returned to the pool only after methods are
> written and any return values are returned.
>
> -adam
>
> On Sun, Aug 28, 2011 at 9:12 PM, chang liu <li...@gmail.com> wrote:
> > Hi,
> >
> > By default, a thrift client can not be used by multi-threads. So I
> extend
> > the generated code and implement a thread safe version. the service
> > defination:
> >
> > service TestRPC {
> > binary search(1:binary param)
> > }
> >
> >
> > here is my extended client code:
> >
> > public class ThreadSafeTestRPCClient extends TestRPC.Client {
> > AtomicInteger idGenerator = new AtomicInteger(1);
> > Connection conn;
> >
> > public TestRPCClient(TProtocol prot) {
> > super(prot);
> > conn = new Connection(prot);
> > conn.start();
> > }
> >
> > public ByteBuffer search(ByteBuffer param) throws
> > org.apache.thrift.TException {
> > int seqid = idGenerator.getAndIncrement();
> > String func = "search";
> > search_args args = new search_args();
> > args.setParam(param);
> > search_result result = new search_result();
> >
> > Call call = new Call(seqid, func, args, result);
> >
> > conn.sendParam(call);
> > call.waitComplete();
> > return ((search_result)call.getResult()).success;
> > }
> >
> > }
> >
> > class Connection extends Thread {
> > org.apache.thrift.protocol.TProtocol iprot_;
> > org.apache.thrift.protocol.TProtocol oprot_;
> >
> > protected ReentrantLock sendLock = new ReentrantLock();
> >
> > Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
> >
> > public Connection(org.apache.thrift.protocol.TProtocol prot) {
> > this(prot, prot);
> > }
> >
> > public Connection(org.apache.thrift.protocol.TProtocol iprot,
> > org.apache.thrift.protocol.TProtocol oprot) {
> > this.iprot_ = iprot;
> > this.oprot_ = oprot;
> > }
> >
> > public void sendParam(Call call) throws TException {
> > calls.put(call.getSeqid(), call);
> > sendLock.lock();
> > try {
> > oprot_.writeMessageBegin(new
> > org.apache.thrift.protocol.TMessage(call.getFunc(),
> > org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
> > call.getArgs().write(oprot_);
> > oprot_.writeMessageEnd();
> > oprot_.getTransport().flush();
> > } finally {
> > sendLock.unlock();
> > }
> > }
> >
> > public void run() {
> > try {
> > while (true) {
> > org.apache.thrift.protocol.TMessage msg =
> > iprot_.readMessageBegin();
> > if (msg.type ==
> > org.apache.thrift.protocol.TMessageType.EXCEPTION) {
> > org.apache.thrift.TApplicationException x =
> > org.apache.thrift.TApplicationException.read(iprot_);
> > iprot_.readMessageEnd();
> > throw x;
> > }
> > int seqid = msg.seqid;
> > Call call = calls.remove(seqid);
> > call.getResult().read(iprot_);
> > iprot_.readMessageEnd();
> > call.complete();
> > }
> > } catch (Exception e) {
> > e.printStackTrace();
> > }
> > }
> > }
> >
> > class Call {
> > private int seqid;
> > private String func;
> > private TBase args;
> > private TBase result;
> > private ReentrantLock lock = new ReentrantLock();
> > private boolean complete = false;
> > private Condition completeCond = lock.newCondition();
> >
> > public Call(int seqid, String func, TBase args, TBase result) {
> > this.seqid = seqid;
> > this.func = func;
> > this.args = args;
> > this.result = result;
> > }
> >
> > public int getSeqid() {
> > return seqid;
> > }
> >
> > public String getFunc() {
> > return func;
> > }
> >
> > public TBase getArgs() {
> > return args;
> > }
> >
> > public TBase getResult() {
> > return result;
> > }
> >
> > void waitComplete() {
> > lock.lock();
> > try {
> > if (!complete) {
> > completeCond.await(5000, TimeUnit.MILLISECONDS);
> > }
> > } catch (InterruptedException e) {
> >
> > } finally {
> > lock.unlock();
> > }
> > }
> >
> > void complete() {
> > lock.lock();
> > try {
> > complete = true;
> > completeCond.signal();
> > } finally {
> > lock.unlock();
> > }
> > }
> > }
> >
> >
> > But when I use this code in multi-threads, I get strange error as below:
> >
> > org.apache.thrift.transport.TTransportException
> > at
> >
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> > at
> >
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
> > at
> >
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> > at
> >
> org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
> > at
> >
> org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
> > at thrift.Connection.run(TestRPCClient.java:77)
> >
> >
> > does anyone know what the problem is? how to do recovery?
> >
> > Any advice will be appreciated.
> >
>
>
>
> --
> Adam Fisk
> http://www.littleshoot.org | http://adamfisk.wordpress.com |
> http://twitter.com/adamfisk
>
Re: problem on implementing a thread safe client in thrift
Posted by Adam Fisk <a...@littleshoot.org>.
I would actually recommend building a thread safe connection pool
where connections are returned to the pool only after methods are
written and any return values are returned.
-adam
On Sun, Aug 28, 2011 at 9:12 PM, chang liu <li...@gmail.com> wrote:
> Hi,
>
> By default, a thrift client can not be used by multi-threads. So I extend
> the generated code and implement a thread safe version. the service
> defination:
>
> service TestRPC {
> binary search(1:binary param)
> }
>
>
> here is my extended client code:
>
> public class ThreadSafeTestRPCClient extends TestRPC.Client {
> AtomicInteger idGenerator = new AtomicInteger(1);
> Connection conn;
>
> public TestRPCClient(TProtocol prot) {
> super(prot);
> conn = new Connection(prot);
> conn.start();
> }
>
> public ByteBuffer search(ByteBuffer param) throws
> org.apache.thrift.TException {
> int seqid = idGenerator.getAndIncrement();
> String func = "search";
> search_args args = new search_args();
> args.setParam(param);
> search_result result = new search_result();
>
> Call call = new Call(seqid, func, args, result);
>
> conn.sendParam(call);
> call.waitComplete();
> return ((search_result)call.getResult()).success;
> }
>
> }
>
> class Connection extends Thread {
> org.apache.thrift.protocol.TProtocol iprot_;
> org.apache.thrift.protocol.TProtocol oprot_;
>
> protected ReentrantLock sendLock = new ReentrantLock();
>
> Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
>
> public Connection(org.apache.thrift.protocol.TProtocol prot) {
> this(prot, prot);
> }
>
> public Connection(org.apache.thrift.protocol.TProtocol iprot,
> org.apache.thrift.protocol.TProtocol oprot) {
> this.iprot_ = iprot;
> this.oprot_ = oprot;
> }
>
> public void sendParam(Call call) throws TException {
> calls.put(call.getSeqid(), call);
> sendLock.lock();
> try {
> oprot_.writeMessageBegin(new
> org.apache.thrift.protocol.TMessage(call.getFunc(),
> org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
> call.getArgs().write(oprot_);
> oprot_.writeMessageEnd();
> oprot_.getTransport().flush();
> } finally {
> sendLock.unlock();
> }
> }
>
> public void run() {
> try {
> while (true) {
> org.apache.thrift.protocol.TMessage msg =
> iprot_.readMessageBegin();
> if (msg.type ==
> org.apache.thrift.protocol.TMessageType.EXCEPTION) {
> org.apache.thrift.TApplicationException x =
> org.apache.thrift.TApplicationException.read(iprot_);
> iprot_.readMessageEnd();
> throw x;
> }
> int seqid = msg.seqid;
> Call call = calls.remove(seqid);
> call.getResult().read(iprot_);
> iprot_.readMessageEnd();
> call.complete();
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> }
>
> class Call {
> private int seqid;
> private String func;
> private TBase args;
> private TBase result;
> private ReentrantLock lock = new ReentrantLock();
> private boolean complete = false;
> private Condition completeCond = lock.newCondition();
>
> public Call(int seqid, String func, TBase args, TBase result) {
> this.seqid = seqid;
> this.func = func;
> this.args = args;
> this.result = result;
> }
>
> public int getSeqid() {
> return seqid;
> }
>
> public String getFunc() {
> return func;
> }
>
> public TBase getArgs() {
> return args;
> }
>
> public TBase getResult() {
> return result;
> }
>
> void waitComplete() {
> lock.lock();
> try {
> if (!complete) {
> completeCond.await(5000, TimeUnit.MILLISECONDS);
> }
> } catch (InterruptedException e) {
>
> } finally {
> lock.unlock();
> }
> }
>
> void complete() {
> lock.lock();
> try {
> complete = true;
> completeCond.signal();
> } finally {
> lock.unlock();
> }
> }
> }
>
>
> But when I use this code in multi-threads, I get strange error as below:
>
> org.apache.thrift.transport.TTransportException
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
> at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> at
> org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
> at
> org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
> at thrift.Connection.run(TestRPCClient.java:77)
>
>
> does anyone know what the problem is? how to do recovery?
>
> Any advice will be appreciated.
>
--
Adam Fisk
http://www.littleshoot.org | http://adamfisk.wordpress.com |
http://twitter.com/adamfisk