You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/29 19:10:09 UTC
[1/4] git commit: add ConfigHelper support for Thrift frame and max
message sizes patch by Pavel Yaskevich;
reviewed by Brandon Williams for CASSANDRA-5188
add ConfigHelper support for Thrift frame and max message sizes
patch by Pavel Yaskevich; reviewed by Brandon Williams for CASSANDRA-5188
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73d828e4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73d828e4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73d828e4
Branch: refs/heads/trunk
Commit: 73d828e4e8023b9f7ca8fafd12becec34eb59211
Parents: 3298c2f
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Fri Jan 25 21:49:25 2013 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Jan 28 10:31:13 2013 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/hadoop/ColumnFamilyOutputFormat.java | 4 +-
.../cassandra/hadoop/ColumnFamilyRecordReader.java | 4 +-
.../org/apache/cassandra/hadoop/ConfigHelper.java | 34 ++++++++++++++-
.../apache/cassandra/thrift/ITransportFactory.java | 3 +-
.../apache/cassandra/thrift/TBinaryProtocol.java | 8 +++
.../cassandra/thrift/TFramedTransportFactory.java | 7 ++-
7 files changed, 52 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1ad77b1..1c414bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
* fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153)
* fix start key/end token validation for wide row iteration (CASSANDRA-5168)
+ * add ConfigHelper support for Thrift frame and max message sizes (CASSANDRA-5188)
1.1.9
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index e01ada5..caea616 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -154,8 +154,8 @@ public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutat
throws InvalidRequestException, TException, AuthenticationException, AuthorizationException, LoginException
{
logger.debug("Creating authenticated client for CF output format");
- TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket);
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+ TTransport transport = ConfigHelper.getOutputTransportFactory(conf).openTransport(socket, conf);
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 83e436b..a40e6c5 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -161,8 +161,8 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap
// create connection using thrift
String location = getLocation();
socket = new TSocket(location, ConfigHelper.getInputRpcPort(conf));
- TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket);
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport);
+ TTransport transport = ConfigHelper.getInputTransportFactory(conf).openTransport(socket, conf);
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, ConfigHelper.getThriftMaxMessageLength(conf));
client = new Cassandra.Client(binaryProtocol);
// log in
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 4b49387..ad29903 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -76,6 +76,8 @@ public class ConfigHelper
private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
+ private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
+ private static final String THRIFT_MAX_MESSAGE_LENGTH_IN_MB = "cassandra.thrift.message.max_size_mb";
private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
@@ -466,6 +468,34 @@ public class ConfigHelper
conf.set(OUTPUT_COMPRESSION_CHUNK_LENGTH, length);
}
+ public static void setThriftFramedTransportSizeInMb(Configuration conf, int frameSizeInMB)
+ {
+ conf.setInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, frameSizeInMB);
+ }
+
+ /**
+ * @param conf The configuration to use.
+ * @return Value (converts MBs to Bytes) set by {@link setThriftFramedTransportSizeInMb(Configuration, int)} or default of 15MB
+ */
+ public static int getThriftFramedTransportSize(Configuration conf)
+ {
+ return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 15) * 1024 * 1024; // 15MB is default in Cassandra
+ }
+
+ public static void setThriftMaxMessageLengthInMb(Configuration conf, int maxMessageSizeInMB)
+ {
+ conf.setInt(THRIFT_MAX_MESSAGE_LENGTH_IN_MB, maxMessageSizeInMB);
+ }
+
+ /**
+ * @param conf The configuration to use.
+ * @return Value (converts MBs to Bytes) set by {@link setThriftMaxMessageLengthInMb(Configuration, int)} or default of 16MB
+ */
+ public static int getThriftMaxMessageLength(Configuration conf)
+ {
+ return conf.getInt(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB, 16) * 1024 * 1024; // 16MB is default in Cassandra
+ }
+
public static CompressionParameters getOutputCompressionParamaters(Configuration conf)
{
if (getOutputCompressionClass(conf) == null)
@@ -526,8 +556,8 @@ public class ConfigHelper
try
{
TSocket socket = new TSocket(host, port);
- TTransport transport = getInputTransportFactory(conf).openTransport(socket);
- return new Cassandra.Client(new TBinaryProtocol(transport));
+ TTransport transport = getInputTransportFactory(conf).openTransport(socket, conf);
+ return new Cassandra.Client(new TBinaryProtocol(transport, getThriftMaxMessageLength(conf)));
}
catch (LoginException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
index 4940fc6..e3e87c2 100644
--- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
*
*/
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -30,5 +31,5 @@ import javax.security.auth.login.LoginException;
public interface ITransportFactory
{
- TTransport openTransport(TSocket socket) throws LoginException, TTransportException;
+ TTransport openTransport(TSocket socket, Configuration conf) throws LoginException, TTransportException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
index aef6c83..3d59f72 100644
--- a/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
+++ b/src/java/org/apache/cassandra/thrift/TBinaryProtocol.java
@@ -37,6 +37,14 @@ public class TBinaryProtocol extends org.apache.thrift.protocol.TBinaryProtocol
this(trans, false, true);
}
+ public TBinaryProtocol(TTransport trans, int readLength)
+ {
+ this(trans);
+
+ if (readLength > 0)
+ setReadLength(readLength);
+ }
+
public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite)
{
super(trans);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73d828e4/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index 09ae99e..792618d 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -21,16 +21,19 @@ package org.apache.cassandra.thrift;
*
*/
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.hadoop.conf.Configuration;
+
public class TFramedTransportFactory implements ITransportFactory
{
- public TTransport openTransport(TSocket socket) throws TTransportException
+ public TTransport openTransport(TSocket socket, Configuration conf) throws TTransportException
{
- TTransport transport = new TFramedTransport(socket);
+ TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
transport.open();
return transport;
}