You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/10/14 23:22:15 UTC

svn commit: r1183506 - in /cassandra/branches/cassandra-0.8: ./ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/hadoop/ test/distributed/org/apache/cassandra/ test/unit/org/apach...

Author: brandonwilliams
Date: Fri Oct 14 21:22:14 2011
New Revision: 1183506

URL: http://svn.apache.org/viewvc?rev=1183506&view=rev
Log:
Unify hadoop support for accept CDL for initial thrift address
Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-3185

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:22:14 2011
@@ -12,6 +12,8 @@
    successfully acquired the compaction lock (CASSANDRA-3344)
  * (Hadoop) make CFIF try rpc_address or fallback to listen_address
    (CASSANDRA-3214)
+ * (Hadoop) accept comma delimited lists of initial thrift connections
+   (CASSANDRA-3185)
 
 
 0.8.7

Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Oct 14 21:22:14 2011
@@ -545,7 +545,7 @@ public class CassandraStorage extends Lo
             Cassandra.Client client = null;
             try
             {
-                client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+                client = ConfigHelper.getClientFromAddressList(conf);
                 CfDef cfDef = null;
                 client.set_keyspace(keyspace);
                 KsDef ksDef = client.describe_keyspace(keyspace);
@@ -579,21 +579,6 @@ public class CassandraStorage extends Lo
         }
     }
 
-    private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
-    {
-        TSocket socket = new TSocket(host, port);
-        TTransport trans = framed ? new TFramedTransport(socket) : socket;
-        try
-        {
-            trans.open();
-        }
-        catch (TTransportException e)
-        {
-            throw new IOException("unable to connect to server", e);
-        }
-        return new Cassandra.Client(new TBinaryProtocol(trans));
-    }
-
     private static String cfdefToString(CfDef cfDef)
     {
         assert cfDef != null;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java Fri Oct 14 21:22:14 2011
@@ -21,25 +21,22 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.TokenRange;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TException;
-import org.apache.cassandra.thrift.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 
 /**
  * A class for caching the ring map at the client. For usage example, see
@@ -50,42 +47,32 @@ public class RingCache
 {
     final private static Logger logger_ = LoggerFactory.getLogger(RingCache.class);
 
-    private final Set<String> seeds_ = new HashSet<String>();
-    private final int port_;
-    private final IPartitioner<?> partitioner_;
-    private final String keyspace;
+    private final IPartitioner<?> partitioner;
+    private final Configuration conf;
 
     private Multimap<Range, InetAddress> rangeMap;
 
-    public RingCache(String keyspace, IPartitioner<?> partitioner, String addresses, int port) throws IOException
+    public RingCache(Configuration conf) throws IOException
     {
-        for (String seed : addresses.split(","))
-            seeds_.add(seed);
-        this.port_ = port;
-        this.keyspace = keyspace;
-        this.partitioner_ = partitioner;
+        this.conf = conf;
+        this.partitioner = ConfigHelper.getPartitioner(conf);
         refreshEndpointMap();
     }
 
     public void refreshEndpointMap()
     {
-        for (String seed : seeds_)
-        {
-            try
-            {
-                TSocket socket = new TSocket(seed, port_);
-                TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
-                Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-                socket.open();
+            try {
+                
+                Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
 
-                List<TokenRange> ring = client.describe_ring(keyspace);
+                List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
                 rangeMap = ArrayListMultimap.create();
                 
                 for (TokenRange range : ring)
                 {
-                    Token<?> left = partitioner_.getTokenFactory().fromString(range.start_token);
-                    Token<?> right = partitioner_.getTokenFactory().fromString(range.end_token);
-                    Range r = new Range(left, right, partitioner_);
+                    Token<?> left = partitioner.getTokenFactory().fromString(range.start_token);
+                    Token<?> right = partitioner.getTokenFactory().fromString(range.end_token);
+                    Range r = new Range(left, right, partitioner);
                     for (String host : range.endpoints)
                     {
                         try
@@ -98,19 +85,20 @@ public class RingCache
                         }
                     }
                 }
-                break;
             }
             catch (InvalidRequestException e)
             {
                 throw new RuntimeException(e);
             }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
             catch (TException e)
             {
-                /* let the Exception go and try another seed. log this though */
-                logger_.debug("Error contacting seed " + seed + " " + e.getMessage());
+                logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage());
             }
         }
-    }
 
     /** ListMultimap promises to return a List for get(K) */
     public List<InetAddress> getEndpoint(Range range)
@@ -126,7 +114,7 @@ public class RingCache
     public Range getRange(ByteBuffer key)
     {
         // TODO: naive linear search of the token map
-        Token<?> t = partitioner_.getToken(key);
+        Token<?> t = partitioner.getToken(key);
         for (Range range : rangeMap.keySet())
             if (range.contains(t))
                 return range;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Fri Oct 14 21:22:14 2011
@@ -24,16 +24,17 @@ package org.apache.cassandra.hadoop;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -41,14 +42,16 @@ import org.apache.cassandra.thrift.Cassa
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -218,7 +221,7 @@ public class ColumnFamilyInputFormat ext
         {
             try
             {
-                Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true);
+                Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true);
                 client.set_keyspace(keyspace);
                 return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
             }
@@ -238,47 +241,10 @@ public class ColumnFamilyInputFormat ext
         throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
     }
 
-    private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
-    {
-        TSocket socket = new TSocket(host, port);
-        TTransport trans = framed ? new TFramedTransport(socket) : socket;
-        try
-        {
-            trans.open();
-        }
-        catch (TTransportException e)
-        {
-            throw new IOException("unable to connect to server", e);
-        }
-        return new Cassandra.Client(new TBinaryProtocol(trans));
-    }
 
     private List<TokenRange> getRangeMap(Configuration conf) throws IOException
     {
-        String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
-        Cassandra.Client client = null;
-        List<IOException> exceptions = new ArrayList<IOException>();
-        for (String address : addresses)
-        {
-            try
-            {
-                client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
-                break;
-            }
-            catch (IOException ioe)
-            {
-                exceptions.add(ioe);
-            }
-        }
-        if (client == null)
-        {
-            logger.error("failed to connect to any initial addresses");
-            for (IOException ioe : exceptions)
-            {
-                logger.error("", ioe);
-            }
-            throw exceptions.get(exceptions.size() - 1);
-        }
+        Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
 
         List<TokenRange> map;
         try
@@ -296,6 +262,8 @@ public class ColumnFamilyInputFormat ext
         return map;
     }
 
+
+
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Fri Oct 14 21:22:14 2011
@@ -95,10 +95,7 @@ implements org.apache.hadoop.mapred.Reco
     ColumnFamilyRecordWriter(Configuration conf) throws IOException
     {
         this.conf = conf;
-        this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
-                                       ConfigHelper.getPartitioner(conf),
-                                       ConfigHelper.getInitialAddress(conf),
-                                       ConfigHelper.getRpcPort(conf));
+        this.ringCache = new RingCache(conf);
         this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors());
         this.clients = new HashMap<Range,RangeClient>();
         batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Fri Oct 14 21:22:14 2011
@@ -19,9 +19,13 @@ package org.apache.cassandra.hadoop;
  * under the License.
  * 
  */
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.TBinaryProtocol;
@@ -30,6 +34,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public class ConfigHelper
 {
@@ -53,6 +64,9 @@ public class ConfigHelper
     private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
     private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
+    
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
 
     /**
      * Set the keyspace and column family for the input of this job.
@@ -330,4 +344,50 @@ public class ConfigHelper
             throw new RuntimeException(e);
         }
     }
+    
+
+    public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException
+    {
+        String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
+        Cassandra.Client client = null;
+        List<IOException> exceptions = new ArrayList<IOException>();
+        for (String address : addresses)
+        {
+            try
+            {
+                client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
+                break;
+            }
+            catch (IOException ioe)
+            {
+                exceptions.add(ioe);
+            }
+        }
+        if (client == null)
+        {
+            logger.error("failed to connect to any initial addresses");
+            for (IOException ioe : exceptions)
+            {
+                logger.error("", ioe);
+            }
+            throw exceptions.get(exceptions.size() - 1);
+        }
+        return client;
+    }
+
+    public static Cassandra.Client createConnection(String host, Integer port, boolean framed)
+            throws IOException
+    {
+        TSocket socket = new TSocket(host, port);
+        TTransport trans = framed ? new TFramedTransport(socket) : socket;
+        try
+        {
+            trans.open();
+        }
+        catch (TTransportException e)
+        {
+            throw new IOException("unable to connect to server", e);
+        }
+        return new Cassandra.Client(new TBinaryProtocol(trans));
+    }
 }

Modified: cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java (original)
+++ cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java Fri Oct 14 21:22:14 2011
@@ -301,6 +301,8 @@ public abstract class TestBase
     protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer key, String keyspace)
         throws IOException
     {
+        Configuration conf = new Configuration();
+        
         RingCache ring = new RingCache(keyspace, new RandomPartitioner(), seed.getHostAddress(), 9160);
         List<InetAddress> privateendpoints = ring.getEndpoint(key);
         List<InetAddress> endpoints = new ArrayList<InetAddress>();

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java Fri Oct 14 21:22:14 2011
@@ -23,16 +23,18 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.ColumnPath;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 
 /**
@@ -42,11 +44,12 @@ public class TestRingCache
 {
     private RingCache ringCache;
     private Cassandra.Client thriftClient;
+    private Configuration conf;
 
     public TestRingCache(String keyspace) throws IOException
     {
-        String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
-    	ringCache = new RingCache(keyspace, DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, "Standard1");
+    	ringCache = new RingCache(conf);
     }
     
     private void setup(String server, int port) throws Exception
@@ -58,6 +61,12 @@ public class TestRingCache
         Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
         socket.open();
         thriftClient = cassandraClient;
+        String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+        conf = new Configuration();
+        ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName());
+        ConfigHelper.setInitialAddress(conf, seed);
+        ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort()));
+
     }
 
     /**