You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@thrift.apache.org by "Michael Eiler (JIRA)" <ji...@apache.org> on 2017/11/23 12:50:00 UTC
[jira] [Comment Edited] (THRIFT-4384) Using multiple services
simultaneously is not thread-safe.
[ https://issues.apache.org/jira/browse/THRIFT-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264278#comment-16264278 ]
Michael Eiler edited comment on THRIFT-4384 at 11/23/17 12:49 PM:
------------------------------------------------------------------
Currently I solved the problem for us like shown here:
{code:none}
diff --git a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
index cbe8da2..515e0e2 100644
--- a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
@@ -1616,6 +1616,8 @@ void t_cpp_generator::generate_service(t_service* tservice) {
<< "namespace apache { namespace thrift { namespace async {" << endl
<< "class TAsyncChannel;" << endl << "}}}" << endl;
}
+ f_header_ << "#include <boost/make_shared.hpp>" << endl;
+ f_header_ << "#include <boost/shared_ptr.hpp>" << endl;
f_header_ << "#include <thrift/TDispatchProcessor.h>" << endl;
if (gen_cob_style_) {
f_header_ << "#include <thrift/async/TAsyncDispatchProcessor.h>" << endl;
@@ -2169,7 +2171,14 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
indent_up();
if (style != "Cob") {
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " prot) ";
+ << " prot";
+ if (style == "Concurrent") {
+ f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+ << ") : sync_(sync) ";
+ }
+ else {
+ f_header_ << ") ";
+ }
if (extends.empty()) {
f_header_ << "{" << endl;
@@ -2182,7 +2191,15 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
}
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " iprot, " << prot_ptr << " oprot) ";
+ << " iprot, " << prot_ptr << " oprot";
+ if (style == "Concurrent") {
+ f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+ << ") : sync_(sync) ";
+ }
+ else {
+ f_header_ << ") ";
+ }
+
if (extends.empty()) {
f_header_ << "{" << endl;
f_header_ << indent() << " setProtocol" << short_suffix << "(iprot,oprot);" << endl
@@ -2328,7 +2345,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
if (style == "Concurrent") {
f_header_ <<
- indent() << "::apache::thrift::async::TConcurrentClientSyncInfo sync_;"<<endl;
+ indent() << "boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync_;"<<endl;
}
indent_down();
}
@@ -2434,7 +2451,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
string cseqidVal = "0";
if (style == "Concurrent") {
if (!(*f_iter)->is_oneway()) {
- cseqidVal = "this->sync_.generateSeqId()";
+ cseqidVal = "this->sync_->generateSeqId()";
}
}
// Serialize the request
@@ -2442,7 +2459,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
indent() << "int32_t cseqid = " << cseqidVal << ";" << endl;
if(style == "Concurrent") {
out <<
- indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get());" << endl;
}
if (style == "Cob") {
out <<
@@ -2507,7 +2524,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
endl <<
indent() << "// the read mutex gets dropped and reacquired as part of waitForWork()" << endl <<
indent() << "// The destructor of this sentry wakes up other clients" << endl <<
- indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid);" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
out << indent() << "bool completed = false;" << endl << endl << indent() << "try {";
@@ -2517,7 +2534,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
if (style == "Concurrent") {
out <<
indent() << "while(true) {" << endl <<
- indent() << " if(!this->sync_.getPending(fname, mtype, rseqid)) {" << endl;
+ indent() << " if(!this->sync_->getPending(fname, mtype, rseqid)) {" << endl;
indent_up();
indent_up();
}
@@ -2661,10 +2678,10 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
out <<
indent() << " }" << endl <<
indent() << " // seqid != rseqid" << endl <<
- indent() << " this->sync_.updatePending(fname, mtype, rseqid);" << endl <<
+ indent() << " this->sync_->updatePending(fname, mtype, rseqid);" << endl <<
endl <<
indent() << " // this will temporarily unlock the readMutex, and let other clients get work done" << endl <<
- indent() << " this->sync_.waitForWork(seqid);" << endl <<
+ indent() << " this->sync_->waitForWork(seqid);" << endl <<
indent() << "} // end while(true)" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
{code}
( https://github.com/MichaelE1000/thrift/commit/44ec66e550d90cf8e0074cd46769b5be0d29253b )
Do you see any issues with this approach?
Best Regards
was (Author: michaele1000):
Currently I solved the problem for us like shown here:
{code:patch}
diff --git a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
index cbe8da2..515e0e2 100644
--- a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
@@ -1616,6 +1616,8 @@ void t_cpp_generator::generate_service(t_service* tservice) {
<< "namespace apache { namespace thrift { namespace async {" << endl
<< "class TAsyncChannel;" << endl << "}}}" << endl;
}
+ f_header_ << "#include <boost/make_shared.hpp>" << endl;
+ f_header_ << "#include <boost/shared_ptr.hpp>" << endl;
f_header_ << "#include <thrift/TDispatchProcessor.h>" << endl;
if (gen_cob_style_) {
f_header_ << "#include <thrift/async/TAsyncDispatchProcessor.h>" << endl;
@@ -2169,7 +2171,14 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
indent_up();
if (style != "Cob") {
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " prot) ";
+ << " prot";
+ if (style == "Concurrent") {
+ f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+ << ") : sync_(sync) ";
+ }
+ else {
+ f_header_ << ") ";
+ }
if (extends.empty()) {
f_header_ << "{" << endl;
@@ -2182,7 +2191,15 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
}
f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
- << " iprot, " << prot_ptr << " oprot) ";
+ << " iprot, " << prot_ptr << " oprot";
+ if (style == "Concurrent") {
+ f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+ << ") : sync_(sync) ";
+ }
+ else {
+ f_header_ << ") ";
+ }
+
if (extends.empty()) {
f_header_ << "{" << endl;
f_header_ << indent() << " setProtocol" << short_suffix << "(iprot,oprot);" << endl
@@ -2328,7 +2345,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
if (style == "Concurrent") {
f_header_ <<
- indent() << "::apache::thrift::async::TConcurrentClientSyncInfo sync_;"<<endl;
+ indent() << "boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync_;"<<endl;
}
indent_down();
}
@@ -2434,7 +2451,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
string cseqidVal = "0";
if (style == "Concurrent") {
if (!(*f_iter)->is_oneway()) {
- cseqidVal = "this->sync_.generateSeqId()";
+ cseqidVal = "this->sync_->generateSeqId()";
}
}
// Serialize the request
@@ -2442,7 +2459,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
indent() << "int32_t cseqid = " << cseqidVal << ";" << endl;
if(style == "Concurrent") {
out <<
- indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get());" << endl;
}
if (style == "Cob") {
out <<
@@ -2507,7 +2524,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
endl <<
indent() << "// the read mutex gets dropped and reacquired as part of waitForWork()" << endl <<
indent() << "// The destructor of this sentry wakes up other clients" << endl <<
- indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);" << endl;
+ indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid);" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
out << indent() << "bool completed = false;" << endl << endl << indent() << "try {";
@@ -2517,7 +2534,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
if (style == "Concurrent") {
out <<
indent() << "while(true) {" << endl <<
- indent() << " if(!this->sync_.getPending(fname, mtype, rseqid)) {" << endl;
+ indent() << " if(!this->sync_->getPending(fname, mtype, rseqid)) {" << endl;
indent_up();
indent_up();
}
@@ -2661,10 +2678,10 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
out <<
indent() << " }" << endl <<
indent() << " // seqid != rseqid" << endl <<
- indent() << " this->sync_.updatePending(fname, mtype, rseqid);" << endl <<
+ indent() << " this->sync_->updatePending(fname, mtype, rseqid);" << endl <<
endl <<
indent() << " // this will temporarily unlock the readMutex, and let other clients get work done" << endl <<
- indent() << " this->sync_.waitForWork(seqid);" << endl <<
+ indent() << " this->sync_->waitForWork(seqid);" << endl <<
indent() << "} // end while(true)" << endl;
}
if (style == "Cob" && !gen_no_client_completion_) {
{code}
( https://github.com/MichaelE1000/thrift/commit/44ec66e550d90cf8e0074cd46769b5be0d29253b )
Do you see any issues with this approach?
Best Regards
> Using multiple services simultaneously is not thread-safe.
> ----------------------------------------------------------
>
> Key: THRIFT-4384
> URL: https://issues.apache.org/jira/browse/THRIFT-4384
> Project: Thrift
> Issue Type: Bug
> Components: C++ - Compiler, C++ - Library
> Affects Versions: 0.10.0
> Environment: Should affect all platforms but has been noticed first on Windows, x86_64.
> Reporter: Michael Eiler
> Priority: Critical
> Fix For: 0.10.0, 0.11.0
>
>
> I'm using the generated *ServiceConcurrentClient classes. They should allow me to call multiple functions at the same time.
> The issue as that the ::apache::thrift::async::TConcurrentClientSyncInfo class is a member of the generated service. If I have a project with multiple services sharing the same connection (protocol) with each other, the services will not be mutually excluded from reading on the same socket.
> I did a small test with patching the generated code and injecting the same instance of TConcurrentClientSyncInfo into all my services and everything was fine.
> Question: Do you need a small project to reproduce this or is it obvious enough? Just check out any generated code and you will see that the TConcurrentClientSyncInfo is not shared between different services.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)