You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2014/09/03 01:14:50 UTC

git commit: Make disruptor_thrift_server invocation pool configurable patch by Pavel Yaskevich; reviewed by Jason Brown for CASSANDRA-7594

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 968ffd8dd -> b22089d7c


Make disruptor_thrift_server invocation pool configurable
patch by Pavel Yaskevich; reviewed by Jason Brown for CASSANDRA-7594


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b22089d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b22089d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b22089d7

Branch: refs/heads/cassandra-2.0
Commit: b22089d7c8f723defec10359e026c767cae57224
Parents: 968ffd8
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Sep 2 15:44:48 2014 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Sep 2 16:14:27 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |   1 +
 build.xml                                          |   4 ++--
 lib/thrift-server-0.3.6.jar                        | Bin 0 -> 39588 bytes
 lib/thrift-server-internal-only-0.3.3.jar          | Bin 39191 -> 0 bytes
 .../cassandra/thrift/THsHaDisruptorServer.java     |  13 +++++++++++++
 5 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22089d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d77f3c..4954b7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Always reject inequality on the partition key without token()
    (CASSANDRA-7722)
  * Always send Paxos commit to all replicas (CASSANDRA-7479)
+ * Make disruptor_thrift_server invocation pool configurable (CASSANDRA-7594)
 
 
 2.0.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22089d7/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index dd59bd2..f456fa8 100644
--- a/build.xml
+++ b/build.xml
@@ -361,7 +361,7 @@
           <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
           <dependency groupId="com.github.stephenc.high-scale-lib" artifactId="high-scale-lib" version="1.1.2"/>
           <dependency groupId="com.github.stephenc" artifactId="jamm" version="0.2.5"/>
-	   <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+	   <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.6"/>
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
           <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.1"/>
 
@@ -467,7 +467,7 @@
         <dependency groupId="org.mindrot" artifactId="jbcrypt"/>
         <dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
         <dependency groupId="com.addthis.metrics" artifactId="reporter-config"/>
-        <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.3"/>
+        <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.6"/>
         <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
 
         <dependency groupId="log4j" artifactId="log4j"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22089d7/lib/thrift-server-0.3.6.jar
----------------------------------------------------------------------
diff --git a/lib/thrift-server-0.3.6.jar b/lib/thrift-server-0.3.6.jar
new file mode 100644
index 0000000..c974f75
Binary files /dev/null and b/lib/thrift-server-0.3.6.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22089d7/lib/thrift-server-internal-only-0.3.3.jar
----------------------------------------------------------------------
diff --git a/lib/thrift-server-internal-only-0.3.3.jar b/lib/thrift-server-internal-only-0.3.3.jar
deleted file mode 100644
index 6a1fbae..0000000
Binary files a/lib/thrift-server-internal-only-0.3.3.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22089d7/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
index e3b89d2..dd501ec 100644
--- a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
+++ b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
@@ -19,9 +19,14 @@
 package org.apache.cassandra.thrift;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import com.thinkaurelius.thrift.Message;
 import com.thinkaurelius.thrift.TDisruptorServer;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +83,13 @@ public class THsHaDisruptorServer extends TDisruptorServer
                 throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
             }
 
+            ThreadPoolExecutor invoker = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                          DatabaseDescriptor.getRpcMaxThreads(),
+                                                                          60L,
+                                                                          TimeUnit.SECONDS,
+                                                                          new SynchronousQueue<Runnable>(),
+                                                                          new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+
             com.thinkaurelius.thrift.util.TBinaryProtocol.Factory protocolFactory = new com.thinkaurelius.thrift.util.TBinaryProtocol.Factory(true, true);
 
             TDisruptorServer.Args serverArgs = new TDisruptorServer.Args(serverTransport).useHeapBasedAllocation(true)
@@ -87,6 +99,7 @@ public class THsHaDisruptorServer extends TDisruptorServer
                                                                                          .outputProtocolFactory(protocolFactory)
                                                                                          .processor(args.processor)
                                                                                          .maxFrameSizeInBytes(DatabaseDescriptor.getThriftFramedTransportSize())
+                                                                                         .invocationExecutor(invoker)
                                                                                          .alwaysReallocateBuffers(true);
 
             return new THsHaDisruptorServer(serverArgs);