You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/01/28 19:45:17 UTC

git commit: add ConfigHelper support for Thrift frame and max message sizes patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-5188

Updated Branches:
  refs/heads/cassandra-1.1 3298c2f19 -> 73d828e4e


add ConfigHelper support for Thrift frame and max message sizes
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-5188


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73d828e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73d828e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73d828e4

Branch: refs/heads/cassandra-1.1
Commit: 73d828e4e8023b9f7ca8fafd12becec34eb59211
Parents: 3298c2f
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Fri Jan 25 21:49:25 2013 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Jan 28 10:31:13 2013 -0800

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |    4 +-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |    4 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   34 ++++++++++++++-
 .../apache/cassandra/thrift/ITransportFactory.java |    3 +-
 .../apache/cassandra/thrift/TBinaryProtocol.java   |    8 +++
 .../cassandra/thrift/TFramedTransportFactory.java  |    7 ++-
 7 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ad77b1..1c414bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
  * fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153)
  * fix start key/end token validation for wide row iteration (CASSANDRA-5168)
+ * add ConfigHelper support for Thrift frame and max message sizes (CASSANDRA-5188)
 
 
 1.1.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index e01ada5..caea616 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -154,8 +154,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
             throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException
     {
         logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket);
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+        TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket, conf);
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
         if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 83e436b..a40e6c5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -161,8 +161,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
             // create connection using thrift
             String location = getLocation();
             socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
-            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket);
-            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+            TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf);
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
             client = new Cassandra.Client(binaryProtocol);
 
             // log in

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 4b49387..ad29903 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -76,6 +76,8 @@ public class ConfigHelper
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
     private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
     private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
+    private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
+    private static final String THRIFT_MAX_MESSAGE_LENGTH_IN_MB = "cassandra.thrift.message.max_size_mb";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
@@ -466,6 +468,34 @@ public class ConfigHelper
         conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
     }
 
+    public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB)
+    {
+        conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB);
+    }
+
+    /**
+     * @param conf The configuration to use.
+     * @return Value (converts MBs to Bytes) set by {@link setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
+     */
+    public static int getThriftFramedTransportSize(Configuration conf)
+    {
+        return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
+    }
+
+    public static void setThriftMaxMessageLengthInMb(Configuration conf, int maxMessageSizeInMB)
+    {
+        conf.setInt(THRIFT_MAX_MESSAGE_LENGTH_IN_MB, maxMessageSizeInMB);
+    }
+
+    /**
+     * @param conf The configuration to use.
+     * @return Value (converts MBs to Bytes) set by {@link setThriftMaxMessageLengthInMb(Configuration, int)} or default of 16MB
+     */
+    public static int getThriftMaxMessageLength(Configuration conf)
+    {
+        return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 16) * 1024 * 1024; // 16MB is default in Cassandra
+    }
+
     public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
     {
         if (getOutputCompressionClass(conf) == null)
@@ -526,8 +556,8 @@ public class ConfigHelper
         try
         {
             TSocket socket = new TSocket(host, port);
-            TTransport transport = getInputTransportFactory(conf).openTransport(socket);
-            return new Cassandra.Client(new TBinaryProtocol(transport));
+            TTransport transport = getInputTransportFactory(conf).openTransport(socket, conf);
+            return new Cassandra.Client(new TBinaryProtocol(transport, getThriftMaxMessageLength(conf)));
         }
         catch (LoginException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
index 4940fc6..e3e87c2 100644
--- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
  *
  */
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -30,5 +31,5 @@ import javax.security.auth.login.LoginException;
 
 public interface ITransportFactory
 {
-    TTransport openTransport(TSocket socket) throws LoginException, TTransportException;
+    TTransport openTransport(TSocket socket, Configuration conf) throws LoginException, TTransportException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
index aef6c83..3d59f72 100644
--- a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
+++ b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
@@ -37,6 +37,14 @@ public class TBinaryProtocol extends org.apache.thrift.protocol.TBinaryProtocol
         this(trans, false, true);
     }
 
+    public TBinaryProtocol(TTransport trans, int readLength)
+    {
+        this(trans);
+
+        if (readLength > 0)
+            setReadLength(readLength);
+    }
+
     public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite)
     {
         super(trans);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index 09ae99e..792618d 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -21,16 +21,19 @@ package org.apache.cassandra.thrift;
  *
  */
 
+import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class TFramedTransportFactory implements ITransportFactory
 {
-    public TTransport openTransport(TSocket socket) throws TTransportException
+    public TTransport openTransport(TSocket socket, Configuration conf) throws TTransportException
     {
-        TTransport transport = new TFramedTransport(socket);
+        TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
         transport.open();
         return transport;
     }