You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/09/19 20:07:52 UTC
[2/3] git commit: Remove Hadoop dependency from ITransportFactory
Remove Hadoop dependency from ITransportFactory
patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6062
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2336d94e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2336d94e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2336d94e
Branch: refs/heads/trunk
Commit: 2336d94ef3a286bc9b7331086085eca56f915e76
Parents: cef6552
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Sep 19 21:05:15 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Sep 19 21:05:15 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/cli/CliMain.java | 28 +++++------------
.../org/apache/cassandra/cli/CliOptions.java | 15 +++++----
.../apache/cassandra/cli/CliSessionState.java | 6 ++--
.../hadoop/AbstractColumnFamilyInputFormat.java | 11 ++++---
.../AbstractColumnFamilyOutputFormat.java | 2 +-
.../apache/cassandra/hadoop/ConfigHelper.java | 33 +++++++-------------
.../cassandra/thrift/ITransportFactory.java | 14 +++------
.../thrift/TFramedTransportFactory.java | 17 +++++-----
9 files changed, 50 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae6eedc..a25d03e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.0.2
* Make ThriftServer more easlly extensible (CASSANDRA-6058)
+ * Remove Hadoop dependency from ITransportFactory (CASSANDRA-6062)
Merged from 1.2:
* Allow cache-keys-to-save to be set at runtime (CASSANDRA-5980)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliMain.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliMain.java b/src/java/org/apache/cassandra/cli/CliMain.java
index 144bd2e..77a9020 100644
--- a/src/java/org/apache/cassandra/cli/CliMain.java
+++ b/src/java/org/apache/cassandra/cli/CliMain.java
@@ -24,16 +24,15 @@ import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.util.*;
-import jline.ConsoleReader;
-import jline.History;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import jline.ConsoleReader;
+import jline.History;
/**
* Cassandra Command Line Interface (CLI) Main
@@ -58,19 +57,12 @@ public class CliMain
*/
public static void connect(String server, int port)
{
- TSocket socket = new TSocket(server, port);
-
if (transport != null)
transport.close();
- transport = sessionState.transportFactory.getTransport(socket);
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
- Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
-
try
{
- if (!transport.isOpen())
- transport.open();
+ transport = sessionState.transportFactory.openTransport(server, port);
}
catch (Exception e)
{
@@ -80,7 +72,8 @@ public class CliMain
throw new RuntimeException("Exception connecting to " + server + "/" + port + ". Reason: " + error + ".");
}
- thriftClient = cassandraClient;
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+ thriftClient = new Cassandra.Client(binaryProtocol);
cliClient = new CliClient(sessionState, thriftClient);
if ((sessionState.username != null) && (sessionState.password != null))
@@ -125,12 +118,7 @@ public class CliMain
cliClient.setKeySpace(sessionState.keyspace);
updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
}
- catch (InvalidRequestException e)
- {
- sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
- return;
- }
- catch (NotFoundException e)
+ catch (InvalidRequestException | NotFoundException e)
{
sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
return;
@@ -201,7 +189,7 @@ public class CliMain
completer.setCandidateStrings(strs);
}
- public static void processStatement(String query) throws CharacterCodingException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+ public static void processStatement(String query) throws CharacterCodingException, TException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
{
cliClient.executeCLIStatement(query);
}
@@ -361,7 +349,7 @@ public class CliMain
private static void evaluateFileStatements(BufferedReader reader) throws IOException
{
- String line = "";
+ String line;
String currentStatement = "";
boolean commentedBlock = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java
index 464d092..68f17c9 100644
--- a/src/java/org/apache/cassandra/cli/CliOptions.java
+++ b/src/java/org/apache/cassandra/cli/CliOptions.java
@@ -18,7 +18,8 @@
package org.apache.cassandra.cli;
import org.apache.commons.cli.*;
-import org.apache.thrift.transport.TTransportFactory;
+
+import org.apache.cassandra.thrift.ITransportFactory;
/**
*
@@ -74,7 +75,7 @@ public class CliOptions
options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port");
options.addOption(null, JMX_USERNAME_OPTION, "JMX-USERNAME", "JMX service username");
options.addOption(null, JMX_PASSWORD_OPTION, "JMX-PASSWORD", "JMX service password");
- options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified TTransportFactory class name for creating a connection to cassandra");
+ options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
// ssl connection-related options
options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
@@ -265,17 +266,15 @@ public class CliOptions
}
}
- private static TTransportFactory validateAndSetTransportFactory(String transportFactory)
+ private static ITransportFactory validateAndSetTransportFactory(String transportFactory)
{
try
{
Class<?> factory = Class.forName(transportFactory);
-
- if(!TTransportFactory.class.isAssignableFrom(factory))
+ if (!ITransportFactory.class.isAssignableFrom(factory))
throw new IllegalArgumentException(String.format("transport factory '%s' " +
- "not derived from TTransportFactory", transportFactory));
-
- return (TTransportFactory) factory.newInstance();
+ "not derived from ITransportFactory", transportFactory));
+ return (ITransportFactory) factory.newInstance();
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliSessionState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliSessionState.java b/src/java/org/apache/cassandra/cli/CliSessionState.java
index 9e833c6..f0de713 100644
--- a/src/java/org/apache/cassandra/cli/CliSessionState.java
+++ b/src/java/org/apache/cassandra/cli/CliSessionState.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cli;
import java.io.InputStream;
import java.io.PrintStream;
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
import org.apache.cassandra.tools.NodeProbe;
-import org.apache.thrift.transport.TTransportFactory;
/**
* Used to hold the state for the CLI.
@@ -44,7 +44,7 @@ public class CliSessionState
public String jmxUsername; // JMX service username
public String jmxPassword; // JMX service password
public boolean verbose = false; // verbose output
- public TTransportFactory transportFactory = new FramedTransportFactory();
+ public ITransportFactory transportFactory = new TFramedTransportFactory();
public EncryptionOptions encOptions = new ClientEncryptionOptions();
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 59c4bf7..d05b890 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -30,6 +30,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@@ -53,11 +58,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
{
@@ -90,7 +91,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF input format");
- TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port, conf);
+ TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
index 5a03777..2040f61 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -120,7 +120,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma
public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
{
logger.debug("Creating authenticated client for CF output format");
- TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port, conf);
+ TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index a109b2f..ebfd3c0 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -23,13 +23,12 @@ package org.apache.cassandra.hadoop;
import java.io.IOException;
import java.util.*;
-import com.google.common.collect.Maps;
-import org.apache.cassandra.io.compress.CompressionParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
@@ -41,7 +40,6 @@ import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
-
public class ConfigHelper
{
private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class";
@@ -69,14 +67,10 @@ public class ConfigHelper
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
-
- private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
- private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
-
/**
* Set the keyspace and column family for the input of this job.
*
@@ -88,13 +82,10 @@ public class ConfigHelper
public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily, boolean widerows)
{
if (keyspace == null)
- {
throw new UnsupportedOperationException("keyspace may not be null");
- }
+
if (columnFamily == null)
- {
throw new UnsupportedOperationException("columnfamily may not be null");
- }
conf.set(INPUT_KEYSPACE_CONFIG, keyspace);
conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily);
@@ -122,9 +113,7 @@ public class ConfigHelper
public static void setOutputKeyspace(Configuration conf, String keyspace)
{
if (keyspace == null)
- {
throw new UnsupportedOperationException("keyspace may not be null");
- }
conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
}
@@ -562,12 +551,11 @@ public class ConfigHelper
return client;
}
- public static Cassandra.Client createConnection(Configuration conf, String host, Integer port)
- throws IOException
+ public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
{
try
{
- TTransport transport = getClientTransportFactory(conf).openTransport(host, port, conf);
+ TTransport transport = getClientTransportFactory(conf).openTransport(host, port);
return new Cassandra.Client(new TBinaryProtocol(transport, true, true));
}
catch (Exception e)
@@ -578,16 +566,15 @@ public class ConfigHelper
public static ITransportFactory getClientTransportFactory(Configuration conf)
{
- String factoryClassName = conf.get(
- ITransportFactory.PROPERTY_KEY,
- TFramedTransportFactory.class.getName());
+ String factoryClassName = conf.get(ITransportFactory.PROPERTY_KEY, TFramedTransportFactory.class.getName());
ITransportFactory factory = getClientTransportFactory(factoryClassName);
Map<String, String> options = getOptions(conf, factory.supportedOptions());
factory.setOptions(options);
return factory;
}
- private static ITransportFactory getClientTransportFactory(String factoryClassName) {
+ private static ITransportFactory getClientTransportFactory(String factoryClassName)
+ {
try
{
return (ITransportFactory) Class.forName(factoryClassName).newInstance();
@@ -597,8 +584,10 @@ public class ConfigHelper
throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e);
}
}
- private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions) {
- Map<String, String> options = Maps.newHashMap();
+
+ private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions)
+ {
+ Map<String, String> options = new HashMap<>();
for (String optionKey : supportedOptions)
{
String optionValue = conf.get(optionKey);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
index 98b1c03..7a65728 100644
--- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -1,5 +1,3 @@
-package org.apache.cassandra.thrift;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,13 +18,12 @@ package org.apache.cassandra.thrift;
* under the License.
*
*/
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.transport.TTransport;
+package org.apache.cassandra.thrift;
import java.util.Map;
import java.util.Set;
+import org.apache.thrift.transport.TTransport;
/**
* Transport factory for establishing thrift connections from clients to a remote server.
@@ -34,8 +31,6 @@ import java.util.Set;
public interface ITransportFactory
{
static final String PROPERTY_KEY = "cassandra.client.transport.factory";
- static final String LONG_OPTION = "transport-factory";
- static final String SHORT_OPTION = "tr";
/**
* Opens a client transport to a thrift server.
@@ -48,16 +43,15 @@ public interface ITransportFactory
*
* @param host fully qualified hostname of the server
* @param port RPC port of the server
- * @param conf Hadoop configuration
* @return open and ready to use transport
* @throws Exception implementation defined; usually throws TTransportException or IOException
* if the connection cannot be established
*/
- TTransport openTransport(String host, int port, Configuration conf) throws Exception;
+ TTransport openTransport(String host, int port) throws Exception;
/**
* Sets an implementation defined set of options.
- * Keys in this map must conform to the set set returned by TClientTransportFactory#supportedOptions.
+ * Keys in this map must conform to the set set returned by ITransportFactory#supportedOptions.
* @param options option map
*/
void setOptions(Map<String, String> options);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index 7d2d89e..a4c6bb7 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -1,5 +1,3 @@
-package org.apache.cassandra.thrift;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,35 +18,38 @@ package org.apache.cassandra.thrift;
* under the License.
*
*/
+package org.apache.cassandra.thrift;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.hadoop.conf.Configuration;
-
public class TFramedTransportFactory implements ITransportFactory
{
- public TTransport openTransport(String host, int port, Configuration conf) throws TTransportException
+ private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
+ private int thriftFramedTransportSizeMb = 15; // 15Mb is the default for C* & Hadoop ConfigHelper
+
+ public TTransport openTransport(String host, int port) throws TTransportException
{
TSocket socket = new TSocket(host, port);
- TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
+ TTransport transport = new TFramedTransport(socket, thriftFramedTransportSizeMb * 1024 * 1024);
transport.open();
return transport;
}
public void setOptions(Map<String, String> options)
{
+ if (options.containsKey(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB))
+ thriftFramedTransportSizeMb = Integer.parseInt(options.get(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB));
}
public Set<String> supportedOptions()
{
- return Collections.emptySet();
+ return Collections.singleton(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB);
}
}