You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/09/19 14:54:42 UTC

git commit: Make ThriftServer more easlly extensible

Updated Branches:
  refs/heads/cassandra-2.0 37e9bce38 -> caf047b85


Make ThriftServer more easlly extensible

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6058


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/caf047b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/caf047b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/caf047b8

Branch: refs/heads/cassandra-2.0
Commit: caf047b8581709ba57327f9e3c4607f273771840
Parents: 37e9bce
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Sep 19 15:53:24 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Sep 19 15:53:24 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/thrift/TServerFactory.java |  3 +-
 .../apache/cassandra/thrift/ThriftServer.java   | 54 ++++++++++++++------
 3 files changed, 42 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/caf047b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2fa36e..06d9b16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * cqlsh: return the result of CAS writes (CASSANDRA-5796)
  * Fix validation of IN clauses with 2ndary indexes (CASSANDRA-6050)
  * Support named bind variables in CQL (CASSANDRA-6033)
+ * Make ThriftServer more easlly extensible (CASSANDRA-6058)
 Merged from 1.2:
  * Avoid second-guessing out-of-space state (CASSANDRA-5605)
  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/caf047b8/src/java/org/apache/cassandra/thrift/TServerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerFactory.java b/src/java/org/apache/cassandra/thrift/TServerFactory.java
index 0c93867..2e2acb8 100644
--- a/src/java/org/apache/cassandra/thrift/TServerFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TServerFactory.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetSocketAddress;
 
+import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportFactory;
@@ -32,7 +33,7 @@ public interface TServerFactory
     {
         public InetSocketAddress addr;
         public CassandraServer cassandraServer;
-        public Cassandra.Processor processor;
+        public TProcessor processor;
         public TProtocolFactory tProtocolFactory;
         public TTransportFactory inTransportFactory;
         public TTransportFactory outTransportFactory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/caf047b8/src/java/org/apache/cassandra/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java
index 5f608a1..3a0d545 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftServer.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java
@@ -25,19 +25,21 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TTransportFactory;
 
 public class ThriftServer implements CassandraDaemon.Server
 {
     private static Logger logger = LoggerFactory.getLogger(ThriftServer.class);
-    final static String SYNC = "sync";
-    final static String ASYNC = "async";
-    final static String HSHA = "hsha";
+    protected final static String SYNC = "sync";
+    protected final static String ASYNC = "async";
+    protected final static String HSHA = "hsha";
 
-    private final InetAddress address;
-    private final int port;
+    protected final InetAddress address;
+    protected final int port;
     private volatile ThriftServerThread server;
 
     public ThriftServer(InetAddress address, int port)
@@ -50,7 +52,8 @@ public class ThriftServer implements CassandraDaemon.Server
     {
         if (server == null)
         {
-            server = new ThriftServerThread(address, port);
+            CassandraServer iface = getCassandraServer();
+            server = new ThriftServerThread(address, port, iface, getProcessor(iface), getTransportFactory());
             server.start();
         }
     }
@@ -77,15 +80,39 @@ public class ThriftServer implements CassandraDaemon.Server
         return server != null;
     }
 
+    /*
+     * These methods are intended to be overriden to provide DSE specific implementations
+     */
+    protected CassandraServer getCassandraServer()
+    {
+        return new CassandraServer();
+    }
+
+    protected TProcessor getProcessor(CassandraServer server)
+    {
+        return new Cassandra.Processor<Cassandra.Iface>(server);
+    }
+
+    protected TTransportFactory getTransportFactory()
+    {
+        int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
+        logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
+        return new TFramedTransport.Factory(tFramedTransportSize);
+    }
+
     /**
      * Simple class to run the thrift connection accepting code in separate
      * thread of control.
      */
     private static class ThriftServerThread extends Thread
     {
-        private TServer serverEngine;
+        private final TServer serverEngine;
 
-        public ThriftServerThread(InetAddress listenAddr, int listenPort)
+        public ThriftServerThread(InetAddress listenAddr,
+                                  int listenPort,
+                                  CassandraServer server,
+                                  TProcessor processor,
+                                  TTransportFactory transportFactory)
         {
             // now we start listening for clients
             logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort));
@@ -93,16 +120,13 @@ public class ThriftServer implements CassandraDaemon.Server
             TServerFactory.Args args = new TServerFactory.Args();
             args.tProtocolFactory = new TBinaryProtocol.Factory(true, true);
             args.addr = new InetSocketAddress(listenAddr, listenPort);
-            args.cassandraServer = new CassandraServer();
-            args.processor = new Cassandra.Processor(args.cassandraServer);
+            args.cassandraServer = server;
+            args.processor = processor;
             args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();
             args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize();
             args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize();
-            int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize();
-
-            logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize);
-            args.inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
-            args.outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize);
+            args.inTransportFactory = transportFactory;
+            args.outTransportFactory = transportFactory;
             serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
         }