You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/09/19 20:07:52 UTC

[2/3] git commit: Remove Hadoop dependency from ITransportFactory

Remove Hadoop dependency from ITransportFactory

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6062


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2336d94e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2336d94e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2336d94e

Branch: refs/heads/trunk
Commit: 2336d94ef3a286bc9b7331086085eca56f915e76
Parents: cef6552
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Sep 19 21:05:15 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Sep 19 21:05:15 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/cli/CliMain.java  | 28 +++++------------
 .../org/apache/cassandra/cli/CliOptions.java    | 15 +++++----
 .../apache/cassandra/cli/CliSessionState.java   |  6 ++--
 .../hadoop/AbstractColumnFamilyInputFormat.java | 11 ++++---
 .../AbstractColumnFamilyOutputFormat.java       |  2 +-
 .../apache/cassandra/hadoop/ConfigHelper.java   | 33 +++++++-------------
 .../cassandra/thrift/ITransportFactory.java     | 14 +++------
 .../thrift/TFramedTransportFactory.java         | 17 +++++-----
 9 files changed, 50 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae6eedc..a25d03e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.0.2
  * Make ThriftServer more easlly extensible (CASSANDRA-6058)
+ * Remove Hadoop dependency from ITransportFactory (CASSANDRA-6062)
 Merged from 1.2:
  * Allow cache-keys-to-save to be set at runtime (CASSANDRA-5980)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliMain.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliMain.java b/src/java/org/apache/cassandra/cli/CliMain.java
index 144bd2e..77a9020 100644
--- a/src/java/org/apache/cassandra/cli/CliMain.java
+++ b/src/java/org/apache/cassandra/cli/CliMain.java
@@ -24,16 +24,15 @@ import java.io.IOException;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import jline.ConsoleReader;
-import jline.History;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import jline.ConsoleReader;
+import jline.History;
 
 /**
  * Cassandra Command Line Interface (CLI) Main
@@ -58,19 +57,12 @@ public class CliMain
      */
     public static void connect(String server, int port)
     {
-        TSocket socket = new TSocket(server, port);
-
         if (transport != null)
             transport.close();
 
-        transport = sessionState.transportFactory.getTransport(socket);
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
-
         try
         {
-            if (!transport.isOpen())
-                transport.open();
+            transport = sessionState.transportFactory.openTransport(server, port);
         }
         catch (Exception e)
         {
@@ -80,7 +72,8 @@ public class CliMain
             throw new RuntimeException("Exception connecting to " + server + "/" + port + ". Reason: " + error + ".");
         }
 
-        thriftClient = cassandraClient;
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        thriftClient = new Cassandra.Client(binaryProtocol);
         cliClient = new CliClient(sessionState, thriftClient);
 
         if ((sessionState.username != null) && (sessionState.password != null))
@@ -125,12 +118,7 @@ public class CliMain
                 cliClient.setKeySpace(sessionState.keyspace);
                 updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
             }
-            catch (InvalidRequestException e)
-            {
-                sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
-                return;
-            }
-            catch (NotFoundException e)
+            catch (InvalidRequestException | NotFoundException e)
             {
                 sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
                 return;
@@ -201,7 +189,7 @@ public class CliMain
         completer.setCandidateStrings(strs);
     }
 
-    public static void processStatement(String query) throws CharacterCodingException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
+    public static void processStatement(String query) throws CharacterCodingException, TException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException
     {
         cliClient.executeCLIStatement(query);
     }
@@ -361,7 +349,7 @@ public class CliMain
 
     private static void evaluateFileStatements(BufferedReader reader) throws IOException
     {
-        String line = "";
+        String line;
         String currentStatement = "";
 
         boolean commentedBlock = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java
index 464d092..68f17c9 100644
--- a/src/java/org/apache/cassandra/cli/CliOptions.java
+++ b/src/java/org/apache/cassandra/cli/CliOptions.java
@@ -18,7 +18,8 @@
 package org.apache.cassandra.cli;
 
 import org.apache.commons.cli.*;
-import org.apache.thrift.transport.TTransportFactory;
+
+import org.apache.cassandra.thrift.ITransportFactory;
 
 /**
  *
@@ -74,7 +75,7 @@ public class CliOptions
         options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port");
         options.addOption(null, JMX_USERNAME_OPTION, "JMX-USERNAME", "JMX service username");
         options.addOption(null, JMX_PASSWORD_OPTION, "JMX-PASSWORD", "JMX service password");
-        options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified TTransportFactory class name for creating a connection to cassandra");
+        options.addOption("tf", TRANSPORT_FACTORY, "TRANSPORT-FACTORY", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
 
         // ssl connection-related options
         options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
@@ -265,17 +266,15 @@ public class CliOptions
         }
     }
 
-    private static TTransportFactory validateAndSetTransportFactory(String transportFactory)
+    private static ITransportFactory validateAndSetTransportFactory(String transportFactory)
     {
         try
         {
             Class<?> factory = Class.forName(transportFactory);
-
-            if(!TTransportFactory.class.isAssignableFrom(factory))
+            if (!ITransportFactory.class.isAssignableFrom(factory))
                 throw new IllegalArgumentException(String.format("transport factory '%s' " +
-                                                                 "not derived from TTransportFactory", transportFactory));
-
-            return (TTransportFactory) factory.newInstance();
+                                                                 "not derived from ITransportFactory", transportFactory));
+            return (ITransportFactory) factory.newInstance();
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/cli/CliSessionState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliSessionState.java b/src/java/org/apache/cassandra/cli/CliSessionState.java
index 9e833c6..f0de713 100644
--- a/src/java/org/apache/cassandra/cli/CliSessionState.java
+++ b/src/java/org/apache/cassandra/cli/CliSessionState.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cli;
 import java.io.InputStream;
 import java.io.PrintStream;
 
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
 import org.apache.cassandra.tools.NodeProbe;
-import org.apache.thrift.transport.TTransportFactory;
 
 /**
  * Used to hold the state for the CLI.
@@ -44,7 +44,7 @@ public class CliSessionState
     public String  jmxUsername;   // JMX service username
     public String  jmxPassword;   // JMX service password
     public boolean verbose = false; // verbose output
-    public TTransportFactory transportFactory = new FramedTransportFactory();
+    public ITransportFactory transportFactory = new TFramedTransportFactory();
     public EncryptionOptions encOptions = new ClientEncryptionOptions();
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 59c4bf7..d05b890 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -30,6 +30,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -53,11 +58,7 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 
 public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
 {
@@ -90,7 +91,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
     {
         logger.debug("Creating authenticated client for CF input format");
-        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port, conf);
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
         TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
index 5a03777..2040f61 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -120,7 +120,7 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma
     public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
     {
         logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port, conf);
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
         TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index a109b2f..ebfd3c0 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -23,13 +23,12 @@ package org.apache.cassandra.hadoop;
 import java.io.IOException;
 import java.util.*;
 
-import com.google.common.collect.Maps;
-import org.apache.cassandra.io.compress.CompressionParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
@@ -41,7 +40,6 @@ import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
 
-
 public class ConfigHelper
 {
     private static final String INPUT_PARTITIONER_CONFIG = "cassandra.input.partitioner.class";
@@ -69,14 +67,10 @@ public class ConfigHelper
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
-
-    private static final String INPUT_TRANSPORT_FACTORY_CLASS = "cassandra.input.transport.factory.class";
-    private static final String OUTPUT_TRANSPORT_FACTORY_CLASS = "cassandra.output.transport.factory.class";
     private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
 
-
     /**
      * Set the keyspace and column family for the input of this job.
      *
@@ -88,13 +82,10 @@ public class ConfigHelper
     public static void setInputColumnFamily(Configuration conf, String keyspace, String columnFamily, boolean widerows)
     {
         if (keyspace == null)
-        {
             throw new UnsupportedOperationException("keyspace may not be null");
-        }
+
         if (columnFamily == null)
-        {
             throw new UnsupportedOperationException("columnfamily may not be null");
-        }
 
         conf.set(INPUT_KEYSPACE_CONFIG, keyspace);
         conf.set(INPUT_COLUMNFAMILY_CONFIG, columnFamily);
@@ -122,9 +113,7 @@ public class ConfigHelper
     public static void setOutputKeyspace(Configuration conf, String keyspace)
     {
         if (keyspace == null)
-        {
             throw new UnsupportedOperationException("keyspace may not be null");
-        }
 
         conf.set(OUTPUT_KEYSPACE_CONFIG, keyspace);
     }
@@ -562,12 +551,11 @@ public class ConfigHelper
         return client;
     }
 
-    public static Cassandra.Client createConnection(Configuration conf, String host, Integer port)
-            throws IOException
+    public static Cassandra.Client createConnection(Configuration conf, String host, Integer port) throws IOException
     {
         try
         {
-            TTransport transport = getClientTransportFactory(conf).openTransport(host, port, conf);
+            TTransport transport = getClientTransportFactory(conf).openTransport(host, port);
             return new Cassandra.Client(new TBinaryProtocol(transport, true, true));
         }
         catch (Exception e)
@@ -578,16 +566,15 @@ public class ConfigHelper
 
     public static ITransportFactory getClientTransportFactory(Configuration conf)
     {
-        String factoryClassName = conf.get(
-                ITransportFactory.PROPERTY_KEY,
-                TFramedTransportFactory.class.getName());
+        String factoryClassName = conf.get(ITransportFactory.PROPERTY_KEY, TFramedTransportFactory.class.getName());
         ITransportFactory factory = getClientTransportFactory(factoryClassName);
         Map<String, String> options = getOptions(conf, factory.supportedOptions());
         factory.setOptions(options);
         return factory;
     }
 
-    private static ITransportFactory getClientTransportFactory(String factoryClassName) {
+    private static ITransportFactory getClientTransportFactory(String factoryClassName)
+    {
         try
         {
             return (ITransportFactory) Class.forName(factoryClassName).newInstance();
@@ -597,8 +584,10 @@ public class ConfigHelper
             throw new RuntimeException("Failed to instantiate transport factory:" + factoryClassName, e);
         }
     }
-    private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions) {
-        Map<String, String> options = Maps.newHashMap();
+
+    private static Map<String, String> getOptions(Configuration conf, Set<String> supportedOptions)
+    {
+        Map<String, String> options = new HashMap<>();
         for (String optionKey : supportedOptions)
         {
             String optionValue = conf.get(optionKey);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/ITransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ITransportFactory.java b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
index 98b1c03..7a65728 100644
--- a/src/java/org/apache/cassandra/thrift/ITransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/ITransportFactory.java
@@ -1,5 +1,3 @@
-package org.apache.cassandra.thrift;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,13 +18,12 @@ package org.apache.cassandra.thrift;
  * under the License.
  *
  */
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.transport.TTransport;
+package org.apache.cassandra.thrift;
 
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.thrift.transport.TTransport;
 
 /**
  * Transport factory for establishing thrift connections from clients to a remote server.
@@ -34,8 +31,6 @@ import java.util.Set;
 public interface ITransportFactory
 {
     static final String PROPERTY_KEY = "cassandra.client.transport.factory";
-    static final String LONG_OPTION = "transport-factory";
-    static final String SHORT_OPTION = "tr";
 
     /**
      * Opens a client transport to a thrift server.
@@ -48,16 +43,15 @@ public interface ITransportFactory
      *
      * @param host fully qualified hostname of the server
      * @param port RPC port of the server
-     * @param conf Hadoop configuration
      * @return open and ready to use transport
      * @throws Exception implementation defined; usually throws TTransportException or IOException
      *         if the connection cannot be established
      */
-    TTransport openTransport(String host, int port, Configuration conf) throws Exception;
+    TTransport openTransport(String host, int port) throws Exception;
 
     /**
      * Sets an implementation defined set of options.
-     * Keys in this map must conform to the set set returned by TClientTransportFactory#supportedOptions.
+     * Keys in this map must conform to the set set returned by ITransportFactory#supportedOptions.
      * @param options option map
      */
     void setOptions(Map<String, String> options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2336d94e/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
index 7d2d89e..a4c6bb7 100644
--- a/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
+++ b/src/java/org/apache/cassandra/thrift/TFramedTransportFactory.java
@@ -1,5 +1,3 @@
-package org.apache.cassandra.thrift;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,35 +18,38 @@ package org.apache.cassandra.thrift;
  * under the License.
  *
  */
+package org.apache.cassandra.thrift;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
-import org.apache.hadoop.conf.Configuration;
-
 public class TFramedTransportFactory implements ITransportFactory
 {
-    public TTransport openTransport(String host, int port, Configuration conf) throws TTransportException
+    private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
+    private int thriftFramedTransportSizeMb = 15; // 15Mb is the default for C* & Hadoop ConfigHelper
+
+    public TTransport openTransport(String host, int port) throws TTransportException
     {
         TSocket socket = new TSocket(host, port);
-        TTransport transport = new TFramedTransport(socket, ConfigHelper.getThriftFramedTransportSize(conf));
+        TTransport transport = new TFramedTransport(socket, thriftFramedTransportSizeMb * 1024 * 1024);
         transport.open();
         return transport;
     }
 
     public void setOptions(Map<String, String> options)
     {
+        if (options.containsKey(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB))
+            thriftFramedTransportSizeMb = Integer.parseInt(options.get(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB));
     }
 
     public Set<String> supportedOptions()
     {
-        return Collections.emptySet();
+        return Collections.singleton(THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB);
     }
 }