You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/06/08 00:41:10 UTC

git commit: Replace Thrift HsHa with LMAX Disruptor based implementation patch by Pavel Yaskevich; reviewed by Aleksey Yeschenko for CASSANDRA-5582

Updated Branches:
  refs/heads/trunk 47ac188a3 -> 98eec0a22


Replace Thrift HsHa with LMAX Disruptor based implementation
patch by Pavel Yaskevich; reviewed by Aleksey Yeschenko for CASSANDRA-5582


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98eec0a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98eec0a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98eec0a2

Branch: refs/heads/trunk
Commit: 98eec0a223251ecd8fec7ecc9e46b05497d631c6
Parents: 47ac188
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon May 20 16:43:14 2013 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Fri Jun 7 15:36:48 2013 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 build.xml                                          |    4 +-
 lib/disruptor-3.0.1.jar                            |  Bin 0 -> 66843 bytes
 lib/thrift-server-0.1.jar                          |  Bin 0 -> 122900 bytes
 .../apache/cassandra/thrift/CustomTHsHaServer.java |  103 ---------------
 .../cassandra/thrift/THsHaDisruptorServer.java     |   86 ++++++++++++
 .../cassandra/thrift/TServerCustomFactory.java     |    3 +-
 7 files changed, 92 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 04122b9..985b803 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
  * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
  * Support native link w/o JNA in Java7 (CASSANDRA-3734)
  * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
+ * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
 
 1.2.6
  * Reduce SSTableLoader memory usage (CASSANDRA-5555)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7e97e66..f5a21be 100644
--- a/build.xml
+++ b/build.xml
@@ -353,6 +353,7 @@
           <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
           <dependency groupId="com.github.stephenc.high-scale-lib" artifactId="high-scale-lib" version="1.1.2"/>
           <dependency groupId="com.github.stephenc" artifactId="jamm" version="0.2.5"/>
+	   <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.1"/>
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.6"/>
           <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.0"/>
 
@@ -459,7 +460,8 @@
         <dependency groupId="edu.stanford.ppl" artifactId="snaptree"/>
         <dependency groupId="org.mindrot" artifactId="jbcrypt"/>
         <dependency groupId="com.yammer.metrics" artifactId="metrics-core"/>
-        
+        <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.1"/>
+
         <dependency groupId="log4j" artifactId="log4j"/>
         <!-- cassandra has a hard dependency on log4j, so force slf4j's log4j provider at runtime -->
         <dependency groupId="org.slf4j" artifactId="slf4j-log4j12" scope="runtime"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/lib/disruptor-3.0.1.jar
----------------------------------------------------------------------
diff --git a/lib/disruptor-3.0.1.jar b/lib/disruptor-3.0.1.jar
new file mode 100644
index 0000000..1899ed0
Binary files /dev/null and b/lib/disruptor-3.0.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/lib/thrift-server-0.1.jar
----------------------------------------------------------------------
diff --git a/lib/thrift-server-0.1.jar b/lib/thrift-server-0.1.jar
new file mode 100644
index 0000000..2c595a0
Binary files /dev/null and b/lib/thrift-server-0.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
deleted file mode 100644
index ca838b1..0000000
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.thrift;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.thrift.server.TThreadedSelectorServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * This is a interim solution till THRIFT-1167 gets committed...
- *
- * The idea here is to avoid sticking to one CPU for IO's. For better throughput
- * it is spread across multiple threads. Number of selector thread can be the
- * number of CPU available.
- */
-public class CustomTHsHaServer extends TThreadedSelectorServer
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(CustomTHsHaServer.class.getName());
-
-    /**
-     * All the arguments to Non Blocking Server will apply here. In addition,
-     * executor pool will be responsible for creating the internal threads which
-     * will process the data. threads for selection usually are equal to the
-     * number of cpu's
-     */
-    public CustomTHsHaServer(Args args)
-    {
-        super(args);
-    }
-
-    protected boolean requestInvoke(FrameBuffer frameBuffer)
-    {
-        TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
-        ThriftSessionManager.instance.setCurrentSocket(socket.getSocketChannel().socket().getRemoteSocketAddress());
-        frameBuffer.invoke();
-        return true;
-    }
-
-    public static class Factory implements TServerFactory
-    {
-        public TServer buildTServer(Args args)
-        {
-            if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
-                throw new RuntimeException("Client SSL is not supported for non-blocking sockets (hsha). Please remove client ssl from the configuration.");
-
-            final InetSocketAddress addr = args.addr;
-            TNonblockingServerTransport serverTransport;
-            try
-            {
-                serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
-            }
-            catch (TTransportException e)
-            {
-                throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
-            }
-
-            // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service.
-            ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
-                                                                               DatabaseDescriptor.getRpcMaxThreads(),
-                                                                               60L,
-                                                                               TimeUnit.SECONDS,
-                                                                               new SynchronousQueue<Runnable>(),
-                                                                               new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
-           TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
-                                                                               .outputTransportFactory(args.outTransportFactory)
-                                                                               .inputProtocolFactory(args.tProtocolFactory)
-                                                                               .outputProtocolFactory(args.tProtocolFactory)
-                                                                               .processor(args.processor)
-                                                                               .selectorThreads(Runtime.getRuntime().availableProcessors())
-                                                                               .executorService(executorService);
-            // Check for available processors in the system which will be equal to the IO Threads.
-            return new CustomTHsHaServer(serverArgs);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
new file mode 100644
index 0000000..a757315
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/THsHaDisruptorServer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+
+import com.thinkaurelius.thrift.Message;
+import com.thinkaurelius.thrift.TDisruptorServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class THsHaDisruptorServer extends TDisruptorServer
+{
+    private static final Logger logger = LoggerFactory.getLogger(THsHaDisruptorServer.class.getName());
+
+    /**
+     * All the arguments to Non Blocking Server will apply here. In addition,
+     * executor pool will be responsible for creating the internal threads which
+     * will process the data. threads for selection usually are equal to the
+     * number of cpu's
+     */
+    public THsHaDisruptorServer(Args args)
+    {
+        super(args);
+        logger.info("Starting up {}", this);
+    }
+
+    @Override
+    protected void beforeInvoke(Message buffer)
+    {
+        TNonblockingSocket socket = (TNonblockingSocket) buffer.transport;
+        ThriftSessionManager.instance.setCurrentSocket(socket.getSocketChannel().socket().getRemoteSocketAddress());
+    }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            if (DatabaseDescriptor.getClientEncryptionOptions().enabled)
+                throw new RuntimeException("Client SSL is not supported for non-blocking sockets (hsha). Please remove client ssl from the configuration.");
+
+            final InetSocketAddress addr = args.addr;
+            TNonblockingServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomNonblockingServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+
+            com.thinkaurelius.thrift.util.TBinaryProtocol.Factory protocolFactory = new com.thinkaurelius.thrift.util.TBinaryProtocol.Factory(true, true);
+
+            TDisruptorServer.Args serverArgs = new TDisruptorServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+                                                                                         .outputTransportFactory(args.outTransportFactory)
+                                                                                         .inputProtocolFactory(protocolFactory)
+                                                                                         .outputProtocolFactory(protocolFactory)
+                                                                                         .processor(args.processor);
+
+            return new THsHaDisruptorServer(serverArgs);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98eec0a2/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
index 208f664..4bf0acd 100644
--- a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.thinkaurelius.thrift.TDisruptorServer;
 import org.apache.thrift.server.TServer;
 
 /**
@@ -53,7 +54,7 @@ public class TServerCustomFactory implements TServerFactory
         }
         else if(ThriftServer.HSHA.equalsIgnoreCase(serverType))
         {
-            server = new CustomTHsHaServer.Factory().buildTServer(args);
+            server = new THsHaDisruptorServer.Factory().buildTServer(args);
             logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
         }
         else