You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@thrift.apache.org by chang liu <li...@gmail.com> on 2011/08/29 06:12:05 UTC

problem on implementing a thread safe client in thrift

Hi,

   By default, a thrift client can not be used by multi-threads. So I extend
the generated code and implement a thread safe version. the service
defination:

  service TestRPC {
    binary search(1:binary param)
 }


here is my extended client code:

public class ThreadSafeTestRPCClient extends TestRPC.Client {
    AtomicInteger idGenerator = new AtomicInteger(1);
    Connection conn;

    public TestRPCClient(TProtocol prot) {
        super(prot);
        conn = new Connection(prot);
        conn.start();
    }

    public ByteBuffer search(ByteBuffer param) throws
org.apache.thrift.TException {
        int seqid = idGenerator.getAndIncrement();
        String func = "search";
        search_args args = new search_args();
        args.setParam(param);
        search_result result = new search_result();

        Call call = new Call(seqid, func, args, result);

        conn.sendParam(call);
        call.waitComplete();
        return ((search_result)call.getResult()).success;
    }

}

class Connection extends Thread {
    org.apache.thrift.protocol.TProtocol iprot_;
    org.apache.thrift.protocol.TProtocol oprot_;

    protected ReentrantLock sendLock = new ReentrantLock();

    Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();

    public Connection(org.apache.thrift.protocol.TProtocol prot) {
        this(prot, prot);
    }

    public Connection(org.apache.thrift.protocol.TProtocol iprot,
org.apache.thrift.protocol.TProtocol oprot) {
        this.iprot_ = iprot;
        this.oprot_ = oprot;
    }

    public void sendParam(Call call) throws TException {
        calls.put(call.getSeqid(), call);
        sendLock.lock();
        try {
            oprot_.writeMessageBegin(new
org.apache.thrift.protocol.TMessage(call.getFunc(),
org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
            call.getArgs().write(oprot_);
            oprot_.writeMessageEnd();
            oprot_.getTransport().flush();
        } finally {
            sendLock.unlock();
        }
    }

    public void run() {
        try {
            while (true) {
                org.apache.thrift.protocol.TMessage msg =
iprot_.readMessageBegin();
                if (msg.type ==
org.apache.thrift.protocol.TMessageType.EXCEPTION) {
                    org.apache.thrift.TApplicationException x =
org.apache.thrift.TApplicationException.read(iprot_);
                    iprot_.readMessageEnd();
                    throw x;
                }
                int seqid = msg.seqid;
                Call call = calls.remove(seqid);
                call.getResult().read(iprot_);
                iprot_.readMessageEnd();
                call.complete();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Call {
    private int seqid;
    private String func;
    private TBase args;
    private TBase result;
    private ReentrantLock lock = new ReentrantLock();
    private boolean complete = false;
    private Condition completeCond = lock.newCondition();

    public Call(int seqid, String func, TBase args, TBase result) {
        this.seqid = seqid;
        this.func = func;
        this.args = args;
        this.result = result;
    }

    public int getSeqid() {
        return seqid;
    }

    public String getFunc() {
        return func;
    }

    public TBase getArgs() {
        return args;
    }

    public TBase getResult() {
        return result;
    }

    void waitComplete() {
        lock.lock();
        try {
            if (!complete) {
                completeCond.await(5000, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {

        } finally {
            lock.unlock();
        }
    }

    void complete() {
        lock.lock();
        try {
            complete = true;
            completeCond.signal();
        } finally {
            lock.unlock();
        }
    }
}


But when I use this code in multi-threads, I get strange error as below:

org.apache.thrift.transport.TTransportException
    at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
    at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
    at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
    at
org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
    at
org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
    at thrift.Connection.run(TestRPCClient.java:77)


does anyone know what the problem is?  how to do recovery?

Any advice will be appreciated.

Re: problem on implementing a thread safe client in thrift

Posted by chang liu <li...@gmail.com>.
Actually, I have tried this idea. I've built a thread safe connection pool
by myself, but found the performace(I mean throughput) of this solution is
worse than that of a thread safe client with multiplex connection. The
reason I guess may be the connection from the pool is not fully utilized
during args writtern to result returned.  Appreciate any comment.

On Tue, Sep 27, 2011 at 7:22 AM, Adam Fisk <a...@littleshoot.org> wrote:

> I would actually recommend building a thread safe connection pool
> where connections are returned to the pool only after methods are
> written and any return values are returned.
>
> -adam
>
> On Sun, Aug 28, 2011 at 9:12 PM, chang liu <li...@gmail.com> wrote:
> > Hi,
> >
> >   By default, a thrift client can not be used by multi-threads. So I
> extend
> > the generated code and implement a thread safe version. the service
> > defination:
> >
> >  service TestRPC {
> >    binary search(1:binary param)
> >  }
> >
> >
> > here is my extended client code:
> >
> > public class ThreadSafeTestRPCClient extends TestRPC.Client {
> >    AtomicInteger idGenerator = new AtomicInteger(1);
> >    Connection conn;
> >
> >    public TestRPCClient(TProtocol prot) {
> >        super(prot);
> >        conn = new Connection(prot);
> >        conn.start();
> >    }
> >
> >    public ByteBuffer search(ByteBuffer param) throws
> > org.apache.thrift.TException {
> >        int seqid = idGenerator.getAndIncrement();
> >        String func = "search";
> >        search_args args = new search_args();
> >        args.setParam(param);
> >        search_result result = new search_result();
> >
> >        Call call = new Call(seqid, func, args, result);
> >
> >        conn.sendParam(call);
> >        call.waitComplete();
> >        return ((search_result)call.getResult()).success;
> >    }
> >
> > }
> >
> > class Connection extends Thread {
> >    org.apache.thrift.protocol.TProtocol iprot_;
> >    org.apache.thrift.protocol.TProtocol oprot_;
> >
> >    protected ReentrantLock sendLock = new ReentrantLock();
> >
> >    Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
> >
> >    public Connection(org.apache.thrift.protocol.TProtocol prot) {
> >        this(prot, prot);
> >    }
> >
> >    public Connection(org.apache.thrift.protocol.TProtocol iprot,
> > org.apache.thrift.protocol.TProtocol oprot) {
> >        this.iprot_ = iprot;
> >        this.oprot_ = oprot;
> >    }
> >
> >    public void sendParam(Call call) throws TException {
> >        calls.put(call.getSeqid(), call);
> >        sendLock.lock();
> >        try {
> >            oprot_.writeMessageBegin(new
> > org.apache.thrift.protocol.TMessage(call.getFunc(),
> > org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
> >            call.getArgs().write(oprot_);
> >            oprot_.writeMessageEnd();
> >            oprot_.getTransport().flush();
> >        } finally {
> >            sendLock.unlock();
> >        }
> >    }
> >
> >    public void run() {
> >        try {
> >            while (true) {
> >                org.apache.thrift.protocol.TMessage msg =
> > iprot_.readMessageBegin();
> >                if (msg.type ==
> > org.apache.thrift.protocol.TMessageType.EXCEPTION) {
> >                    org.apache.thrift.TApplicationException x =
> > org.apache.thrift.TApplicationException.read(iprot_);
> >                    iprot_.readMessageEnd();
> >                    throw x;
> >                }
> >                int seqid = msg.seqid;
> >                Call call = calls.remove(seqid);
> >                call.getResult().read(iprot_);
> >                iprot_.readMessageEnd();
> >                call.complete();
> >            }
> >        } catch (Exception e) {
> >            e.printStackTrace();
> >        }
> >    }
> > }
> >
> > class Call {
> >    private int seqid;
> >    private String func;
> >    private TBase args;
> >    private TBase result;
> >    private ReentrantLock lock = new ReentrantLock();
> >    private boolean complete = false;
> >    private Condition completeCond = lock.newCondition();
> >
> >    public Call(int seqid, String func, TBase args, TBase result) {
> >        this.seqid = seqid;
> >        this.func = func;
> >        this.args = args;
> >        this.result = result;
> >    }
> >
> >    public int getSeqid() {
> >        return seqid;
> >    }
> >
> >    public String getFunc() {
> >        return func;
> >    }
> >
> >    public TBase getArgs() {
> >        return args;
> >    }
> >
> >    public TBase getResult() {
> >        return result;
> >    }
> >
> >    void waitComplete() {
> >        lock.lock();
> >        try {
> >            if (!complete) {
> >                completeCond.await(5000, TimeUnit.MILLISECONDS);
> >            }
> >        } catch (InterruptedException e) {
> >
> >        } finally {
> >            lock.unlock();
> >        }
> >    }
> >
> >    void complete() {
> >        lock.lock();
> >        try {
> >            complete = true;
> >            completeCond.signal();
> >        } finally {
> >            lock.unlock();
> >        }
> >    }
> > }
> >
> >
> > But when I use this code in multi-threads, I get strange error as below:
> >
> > org.apache.thrift.transport.TTransportException
> >    at
> >
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> >    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >    at
> >
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
> >    at
> >
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> >    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
> >    at
> >
> org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
> >    at
> >
> org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
> >    at thrift.Connection.run(TestRPCClient.java:77)
> >
> >
> > does anyone know what the problem is?  how to do recovery?
> >
> > Any advice will be appreciated.
> >
>
>
>
> --
> Adam Fisk
> http://www.littleshoot.org | http://adamfisk.wordpress.com |
> http://twitter.com/adamfisk
>

Re: problem on implementing a thread safe client in thrift

Posted by Adam Fisk <a...@littleshoot.org>.
I would actually recommend building a thread safe connection pool
where connections are returned to the pool only after methods are
written and any return values are returned.

-adam

On Sun, Aug 28, 2011 at 9:12 PM, chang liu <li...@gmail.com> wrote:
> Hi,
>
>   By default, a thrift client can not be used by multi-threads. So I extend
> the generated code and implement a thread safe version. the service
> defination:
>
>  service TestRPC {
>    binary search(1:binary param)
>  }
>
>
> here is my extended client code:
>
> public class ThreadSafeTestRPCClient extends TestRPC.Client {
>    AtomicInteger idGenerator = new AtomicInteger(1);
>    Connection conn;
>
>    public TestRPCClient(TProtocol prot) {
>        super(prot);
>        conn = new Connection(prot);
>        conn.start();
>    }
>
>    public ByteBuffer search(ByteBuffer param) throws
> org.apache.thrift.TException {
>        int seqid = idGenerator.getAndIncrement();
>        String func = "search";
>        search_args args = new search_args();
>        args.setParam(param);
>        search_result result = new search_result();
>
>        Call call = new Call(seqid, func, args, result);
>
>        conn.sendParam(call);
>        call.waitComplete();
>        return ((search_result)call.getResult()).success;
>    }
>
> }
>
> class Connection extends Thread {
>    org.apache.thrift.protocol.TProtocol iprot_;
>    org.apache.thrift.protocol.TProtocol oprot_;
>
>    protected ReentrantLock sendLock = new ReentrantLock();
>
>    Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
>
>    public Connection(org.apache.thrift.protocol.TProtocol prot) {
>        this(prot, prot);
>    }
>
>    public Connection(org.apache.thrift.protocol.TProtocol iprot,
> org.apache.thrift.protocol.TProtocol oprot) {
>        this.iprot_ = iprot;
>        this.oprot_ = oprot;
>    }
>
>    public void sendParam(Call call) throws TException {
>        calls.put(call.getSeqid(), call);
>        sendLock.lock();
>        try {
>            oprot_.writeMessageBegin(new
> org.apache.thrift.protocol.TMessage(call.getFunc(),
> org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
>            call.getArgs().write(oprot_);
>            oprot_.writeMessageEnd();
>            oprot_.getTransport().flush();
>        } finally {
>            sendLock.unlock();
>        }
>    }
>
>    public void run() {
>        try {
>            while (true) {
>                org.apache.thrift.protocol.TMessage msg =
> iprot_.readMessageBegin();
>                if (msg.type ==
> org.apache.thrift.protocol.TMessageType.EXCEPTION) {
>                    org.apache.thrift.TApplicationException x =
> org.apache.thrift.TApplicationException.read(iprot_);
>                    iprot_.readMessageEnd();
>                    throw x;
>                }
>                int seqid = msg.seqid;
>                Call call = calls.remove(seqid);
>                call.getResult().read(iprot_);
>                iprot_.readMessageEnd();
>                call.complete();
>            }
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
> }
>
> class Call {
>    private int seqid;
>    private String func;
>    private TBase args;
>    private TBase result;
>    private ReentrantLock lock = new ReentrantLock();
>    private boolean complete = false;
>    private Condition completeCond = lock.newCondition();
>
>    public Call(int seqid, String func, TBase args, TBase result) {
>        this.seqid = seqid;
>        this.func = func;
>        this.args = args;
>        this.result = result;
>    }
>
>    public int getSeqid() {
>        return seqid;
>    }
>
>    public String getFunc() {
>        return func;
>    }
>
>    public TBase getArgs() {
>        return args;
>    }
>
>    public TBase getResult() {
>        return result;
>    }
>
>    void waitComplete() {
>        lock.lock();
>        try {
>            if (!complete) {
>                completeCond.await(5000, TimeUnit.MILLISECONDS);
>            }
>        } catch (InterruptedException e) {
>
>        } finally {
>            lock.unlock();
>        }
>    }
>
>    void complete() {
>        lock.lock();
>        try {
>            complete = true;
>            completeCond.signal();
>        } finally {
>            lock.unlock();
>        }
>    }
> }
>
>
> But when I use this code in multi-threads, I get strange error as below:
>
> org.apache.thrift.transport.TTransportException
>    at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>    at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
>    at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>    at
> org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
>    at
> org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
>    at thrift.Connection.run(TestRPCClient.java:77)
>
>
> does anyone know what the problem is?  how to do recovery?
>
> Any advice will be appreciated.
>



-- 
Adam Fisk
http://www.littleshoot.org | http://adamfisk.wordpress.com |
http://twitter.com/adamfisk