You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@thrift.apache.org by "Michael Eiler (JIRA)" <ji...@apache.org> on 2017/11/23 12:50:00 UTC

[jira] [Comment Edited] (THRIFT-4384) Using multiple services simultaneously is not thread-safe.

    [ https://issues.apache.org/jira/browse/THRIFT-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264278#comment-16264278 ] 

Michael Eiler edited comment on THRIFT-4384 at 11/23/17 12:49 PM:
------------------------------------------------------------------

Currently I solved the problem for us like shown here:

{code:none}
diff --git a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
index cbe8da2..515e0e2 100644
--- a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
@@ -1616,6 +1616,8 @@ void t_cpp_generator::generate_service(t_service* tservice) {
               << "namespace apache { namespace thrift { namespace async {" << endl
               << "class TAsyncChannel;" << endl << "}}}" << endl;
   }
+  f_header_ << "#include <boost/make_shared.hpp>" << endl;
+  f_header_ << "#include <boost/shared_ptr.hpp>" << endl;
   f_header_ << "#include <thrift/TDispatchProcessor.h>" << endl;
   if (gen_cob_style_) {
     f_header_ << "#include <thrift/async/TAsyncDispatchProcessor.h>" << endl;
@@ -2169,7 +2171,14 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
   indent_up();
   if (style != "Cob") {
     f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
-              << " prot) ";
+		<< " prot";
+	if (style == "Concurrent") {
+		f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+			<< ") : sync_(sync) ";
+	}
+	else {
+		f_header_ << ") ";
+	}
 
     if (extends.empty()) {
       f_header_ << "{" << endl;
@@ -2182,7 +2191,15 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
     }
 
     f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
-              << " iprot, " << prot_ptr << " oprot) ";
+		<< " iprot, " << prot_ptr << " oprot";
+	if (style == "Concurrent") {
+		f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+			<< ") : sync_(sync) ";
+	}
+	else {
+		f_header_ << ") ";
+	}
+	
     if (extends.empty()) {
       f_header_ << "{" << endl;
       f_header_ << indent() << "  setProtocol" << short_suffix << "(iprot,oprot);" << endl
@@ -2328,7 +2345,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
 
     if (style == "Concurrent") {
       f_header_ <<
-        indent() << "::apache::thrift::async::TConcurrentClientSyncInfo sync_;"<<endl;
+        indent() << "boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync_;"<<endl;
     }
     indent_down();
   }
@@ -2434,7 +2451,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
       string cseqidVal = "0";
       if (style == "Concurrent") {
         if (!(*f_iter)->is_oneway()) {
-          cseqidVal = "this->sync_.generateSeqId()";
+          cseqidVal = "this->sync_->generateSeqId()";
         }
       }
       // Serialize the request
@@ -2442,7 +2459,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
         indent() << "int32_t cseqid = " << cseqidVal << ";" << endl;
       if(style == "Concurrent") {
         out <<
-          indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
+          indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get());" << endl;
       }
       if (style == "Cob") {
         out <<
@@ -2507,7 +2524,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
             endl <<
             indent() << "// the read mutex gets dropped and reacquired as part of waitForWork()" << endl <<
             indent() << "// The destructor of this sentry wakes up other clients" << endl <<
-            indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);" << endl;
+            indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid);" << endl;
         }
         if (style == "Cob" && !gen_no_client_completion_) {
           out << indent() << "bool completed = false;" << endl << endl << indent() << "try {";
@@ -2517,7 +2534,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
         if (style == "Concurrent") {
           out <<
             indent() << "while(true) {" << endl <<
-            indent() << "  if(!this->sync_.getPending(fname, mtype, rseqid)) {" << endl;
+            indent() << "  if(!this->sync_->getPending(fname, mtype, rseqid)) {" << endl;
           indent_up();
           indent_up();
         }
@@ -2661,10 +2678,10 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
           out <<
             indent() << "  }" << endl <<
             indent() << "  // seqid != rseqid" << endl <<
-            indent() << "  this->sync_.updatePending(fname, mtype, rseqid);" << endl <<
+            indent() << "  this->sync_->updatePending(fname, mtype, rseqid);" << endl <<
             endl <<
             indent() << "  // this will temporarily unlock the readMutex, and let other clients get work done" << endl <<
-            indent() << "  this->sync_.waitForWork(seqid);" << endl <<
+            indent() << "  this->sync_->waitForWork(seqid);" << endl <<
             indent() << "} // end while(true)" << endl;
         }
         if (style == "Cob" && !gen_no_client_completion_) {
{code}

( https://github.com/MichaelE1000/thrift/commit/44ec66e550d90cf8e0074cd46769b5be0d29253b )

Do you see any issues with this approach?

Best Regards


was (Author: michaele1000):
Currently I solved the problem for us like shown here:

{code:patch}
diff --git a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
index cbe8da2..515e0e2 100644
--- a/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_cpp_generator.cc
@@ -1616,6 +1616,8 @@ void t_cpp_generator::generate_service(t_service* tservice) {
               << "namespace apache { namespace thrift { namespace async {" << endl
               << "class TAsyncChannel;" << endl << "}}}" << endl;
   }
+  f_header_ << "#include <boost/make_shared.hpp>" << endl;
+  f_header_ << "#include <boost/shared_ptr.hpp>" << endl;
   f_header_ << "#include <thrift/TDispatchProcessor.h>" << endl;
   if (gen_cob_style_) {
     f_header_ << "#include <thrift/async/TAsyncDispatchProcessor.h>" << endl;
@@ -2169,7 +2171,14 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
   indent_up();
   if (style != "Cob") {
     f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
-              << " prot) ";
+		<< " prot";
+	if (style == "Concurrent") {
+		f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+			<< ") : sync_(sync) ";
+	}
+	else {
+		f_header_ << ") ";
+	}
 
     if (extends.empty()) {
       f_header_ << "{" << endl;
@@ -2182,7 +2191,15 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
     }
 
     f_header_ << indent() << service_name_ << style << "Client" << short_suffix << "(" << prot_ptr
-              << " iprot, " << prot_ptr << " oprot) ";
+		<< " iprot, " << prot_ptr << " oprot";
+	if (style == "Concurrent") {
+		f_header_ << ", boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync = boost::make_shared<::apache::thrift::async::TConcurrentClientSyncInfo>()"
+			<< ") : sync_(sync) ";
+	}
+	else {
+		f_header_ << ") ";
+	}
+	
     if (extends.empty()) {
       f_header_ << "{" << endl;
       f_header_ << indent() << "  setProtocol" << short_suffix << "(iprot,oprot);" << endl
@@ -2328,7 +2345,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
 
     if (style == "Concurrent") {
       f_header_ <<
-        indent() << "::apache::thrift::async::TConcurrentClientSyncInfo sync_;"<<endl;
+        indent() << "boost::shared_ptr<::apache::thrift::async::TConcurrentClientSyncInfo> sync_;"<<endl;
     }
     indent_down();
   }
@@ -2434,7 +2451,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
       string cseqidVal = "0";
       if (style == "Concurrent") {
         if (!(*f_iter)->is_oneway()) {
-          cseqidVal = "this->sync_.generateSeqId()";
+          cseqidVal = "this->sync_->generateSeqId()";
         }
       }
       // Serialize the request
@@ -2442,7 +2459,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
         indent() << "int32_t cseqid = " << cseqidVal << ";" << endl;
       if(style == "Concurrent") {
         out <<
-          indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
+          indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(this->sync_.get());" << endl;
       }
       if (style == "Cob") {
         out <<
@@ -2507,7 +2524,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
             endl <<
             indent() << "// the read mutex gets dropped and reacquired as part of waitForWork()" << endl <<
             indent() << "// The destructor of this sentry wakes up other clients" << endl <<
-            indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);" << endl;
+            indent() << "::apache::thrift::async::TConcurrentRecvSentry sentry(this->sync_.get(), seqid);" << endl;
         }
         if (style == "Cob" && !gen_no_client_completion_) {
           out << indent() << "bool completed = false;" << endl << endl << indent() << "try {";
@@ -2517,7 +2534,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
         if (style == "Concurrent") {
           out <<
             indent() << "while(true) {" << endl <<
-            indent() << "  if(!this->sync_.getPending(fname, mtype, rseqid)) {" << endl;
+            indent() << "  if(!this->sync_->getPending(fname, mtype, rseqid)) {" << endl;
           indent_up();
           indent_up();
         }
@@ -2661,10 +2678,10 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
           out <<
             indent() << "  }" << endl <<
             indent() << "  // seqid != rseqid" << endl <<
-            indent() << "  this->sync_.updatePending(fname, mtype, rseqid);" << endl <<
+            indent() << "  this->sync_->updatePending(fname, mtype, rseqid);" << endl <<
             endl <<
             indent() << "  // this will temporarily unlock the readMutex, and let other clients get work done" << endl <<
-            indent() << "  this->sync_.waitForWork(seqid);" << endl <<
+            indent() << "  this->sync_->waitForWork(seqid);" << endl <<
             indent() << "} // end while(true)" << endl;
         }
         if (style == "Cob" && !gen_no_client_completion_) {
{code}

( https://github.com/MichaelE1000/thrift/commit/44ec66e550d90cf8e0074cd46769b5be0d29253b )

Do you see any issues with this approach?

Best Regards

> Using multiple services simultaneously is not thread-safe.
> ----------------------------------------------------------
>
>                 Key: THRIFT-4384
>                 URL: https://issues.apache.org/jira/browse/THRIFT-4384
>             Project: Thrift
>          Issue Type: Bug
>          Components: C++ - Compiler, C++ - Library
>    Affects Versions: 0.10.0
>         Environment: Should affect all platforms but has been noticed first on Windows, x86_64.
>            Reporter: Michael Eiler
>            Priority: Critical
>             Fix For: 0.10.0, 0.11.0
>
>
> I'm using the generated *ServiceConcurrentClient classes. They should allow me to call multiple functions at the same time.
> The issue as that the ::apache::thrift::async::TConcurrentClientSyncInfo class is a member of the generated service. If I have a project with multiple services sharing the same connection (protocol) with each other, the services will not be mutually excluded from reading on the same socket. 
> I did a small test with patching the generated code and injecting the same instance of TConcurrentClientSyncInfo into all my services and everything was fine.
> Question: Do you need a small project to reproduce this or is it obvious enough? Just check out any generated code and you will see that the TConcurrentClientSyncInfo is not shared between different services.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)