You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/10/14 23:22:15 UTC
svn commit: r1183506 - in /cassandra/branches/cassandra-0.8: ./
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/hadoop/
test/distributed/org/apache/cassandra/ test/unit/org/apach...
Author: brandonwilliams
Date: Fri Oct 14 21:22:14 2011
New Revision: 1183506
URL: http://svn.apache.org/viewvc?rev=1183506&view=rev
Log:
Unify hadoop support for accept CDL for initial thrift address
Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-3185
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Oct 14 21:22:14 2011
@@ -12,6 +12,8 @@
successfully acquired the compaction lock (CASSANDRA-3344)
* (Hadoop) make CFIF try rpc_address or fallback to listen_address
(CASSANDRA-3214)
+ * (Hadoop) accept comma delimited lists of initial thrift connections
+ (CASSANDRA-3185)
0.8.7
Modified: cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Oct 14 21:22:14 2011
@@ -545,7 +545,7 @@ public class CassandraStorage extends Lo
Cassandra.Client client = null;
try
{
- client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+ client = ConfigHelper.getClientFromAddressList(conf);
CfDef cfDef = null;
client.set_keyspace(keyspace);
KsDef ksDef = client.describe_keyspace(keyspace);
@@ -579,21 +579,6 @@ public class CassandraStorage extends Lo
}
}
- private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = framed ? new TFramedTransport(socket) : socket;
- try
- {
- trans.open();
- }
- catch (TTransportException e)
- {
- throw new IOException("unable to connect to server", e);
- }
- return new Cassandra.Client(new TBinaryProtocol(trans));
- }
-
private static String cfdefToString(CfDef cfDef)
{
assert cfDef != null;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/client/RingCache.java Fri Oct 14 21:22:14 2011
@@ -21,25 +21,22 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TokenRange;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
-import org.apache.cassandra.thrift.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
/**
* A class for caching the ring map at the client. For usage example, see
@@ -50,42 +47,32 @@ public class RingCache
{
final private static Logger logger_ = LoggerFactory.getLogger(RingCache.class);
- private final Set<String> seeds_ = new HashSet<String>();
- private final int port_;
- private final IPartitioner<?> partitioner_;
- private final String keyspace;
+ private final IPartitioner<?> partitioner;
+ private final Configuration conf;
private Multimap<Range, InetAddress> rangeMap;
- public RingCache(String keyspace, IPartitioner<?> partitioner, String addresses, int port) throws IOException
+ public RingCache(Configuration conf) throws IOException
{
- for (String seed : addresses.split(","))
- seeds_.add(seed);
- this.port_ = port;
- this.keyspace = keyspace;
- this.partitioner_ = partitioner;
+ this.conf = conf;
+ this.partitioner = ConfigHelper.getPartitioner(conf);
refreshEndpointMap();
}
public void refreshEndpointMap()
{
- for (String seed : seeds_)
- {
- try
- {
- TSocket socket = new TSocket(seed, port_);
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
- socket.open();
+ try {
+
+ Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
- List<TokenRange> ring = client.describe_ring(keyspace);
+ List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
rangeMap = ArrayListMultimap.create();
for (TokenRange range : ring)
{
- Token<?> left = partitioner_.getTokenFactory().fromString(range.start_token);
- Token<?> right = partitioner_.getTokenFactory().fromString(range.end_token);
- Range r = new Range(left, right, partitioner_);
+ Token<?> left = partitioner.getTokenFactory().fromString(range.start_token);
+ Token<?> right = partitioner.getTokenFactory().fromString(range.end_token);
+ Range r = new Range(left, right, partitioner);
for (String host : range.endpoints)
{
try
@@ -98,19 +85,20 @@ public class RingCache
}
}
}
- break;
}
catch (InvalidRequestException e)
{
throw new RuntimeException(e);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
catch (TException e)
{
- /* let the Exception go and try another seed. log this though */
- logger_.debug("Error contacting seed " + seed + " " + e.getMessage());
+ logger_.debug("Error contacting seed list" + ConfigHelper.getInitialAddress(conf) + " " + e.getMessage());
}
}
- }
/** ListMultimap promises to return a List for get(K) */
public List<InetAddress> getEndpoint(Range range)
@@ -126,7 +114,7 @@ public class RingCache
public Range getRange(ByteBuffer key)
{
// TODO: naive linear search of the token map
- Token<?> t = partitioner_.getToken(key);
+ Token<?> t = partitioner.getToken(key);
for (Range range : rangeMap.keySet())
if (range.contains(t))
return range;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Fri Oct 14 21:22:14 2011
@@ -24,16 +24,17 @@ package org.apache.cassandra.hadoop;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@@ -41,14 +42,16 @@ import org.apache.cassandra.thrift.Cassa
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -218,7 +221,7 @@ public class ColumnFamilyInputFormat ext
{
try
{
- Cassandra.Client client = createConnection(host, ConfigHelper.getRpcPort(conf), true);
+ Cassandra.Client client = ConfigHelper.createConnection(host, ConfigHelper.getRpcPort(conf), true);
client.set_keyspace(keyspace);
return client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
}
@@ -238,47 +241,10 @@ public class ColumnFamilyInputFormat ext
throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
}
- private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = framed ? new TFramedTransport(socket) : socket;
- try
- {
- trans.open();
- }
- catch (TTransportException e)
- {
- throw new IOException("unable to connect to server", e);
- }
- return new Cassandra.Client(new TBinaryProtocol(trans));
- }
private List<TokenRange> getRangeMap(Configuration conf) throws IOException
{
- String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
- Cassandra.Client client = null;
- List<IOException> exceptions = new ArrayList<IOException>();
- for (String address : addresses)
- {
- try
- {
- client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
- break;
- }
- catch (IOException ioe)
- {
- exceptions.add(ioe);
- }
- }
- if (client == null)
- {
- logger.error("failed to connect to any initial addresses");
- for (IOException ioe : exceptions)
- {
- logger.error("", ioe);
- }
- throw exceptions.get(exceptions.size() - 1);
- }
+ Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
List<TokenRange> map;
try
@@ -296,6 +262,8 @@ public class ColumnFamilyInputFormat ext
return map;
}
+
+
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Fri Oct 14 21:22:14 2011
@@ -95,10 +95,7 @@ implements org.apache.hadoop.mapred.Reco
ColumnFamilyRecordWriter(Configuration conf) throws IOException
{
this.conf = conf;
- this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
- ConfigHelper.getPartitioner(conf),
- ConfigHelper.getInitialAddress(conf),
- ConfigHelper.getRpcPort(conf));
+ this.ringCache = new RingCache(conf);
this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors());
this.clients = new HashMap<Range,RangeClient>();
batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Fri Oct 14 21:22:14 2011
@@ -19,9 +19,13 @@ package org.apache.cassandra.hadoop;
* under the License.
*
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TBinaryProtocol;
@@ -30,6 +34,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ConfigHelper
{
@@ -53,6 +64,9 @@ public class ConfigHelper
private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
/**
* Set the keyspace and column family for the input of this job.
@@ -330,4 +344,50 @@ public class ConfigHelper
throw new RuntimeException(e);
}
}
+
+
+ public static Cassandra.Client getClientFromAddressList(Configuration conf) throws IOException
+ {
+ String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
+ Cassandra.Client client = null;
+ List<IOException> exceptions = new ArrayList<IOException>();
+ for (String address : addresses)
+ {
+ try
+ {
+ client = createConnection(address, ConfigHelper.getRpcPort(conf), true);
+ break;
+ }
+ catch (IOException ioe)
+ {
+ exceptions.add(ioe);
+ }
+ }
+ if (client == null)
+ {
+ logger.error("failed to connect to any initial addresses");
+ for (IOException ioe : exceptions)
+ {
+ logger.error("", ioe);
+ }
+ throw exceptions.get(exceptions.size() - 1);
+ }
+ return client;
+ }
+
+ public static Cassandra.Client createConnection(String host, Integer port, boolean framed)
+ throws IOException
+ {
+ TSocket socket = new TSocket(host, port);
+ TTransport trans = framed ? new TFramedTransport(socket) : socket;
+ try
+ {
+ trans.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new IOException("unable to connect to server", e);
+ }
+ return new Cassandra.Client(new TBinaryProtocol(trans));
+ }
}
Modified: cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java (original)
+++ cassandra/branches/cassandra-0.8/test/distributed/org/apache/cassandra/TestBase.java Fri Oct 14 21:22:14 2011
@@ -301,6 +301,8 @@ public abstract class TestBase
protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer key, String keyspace)
throws IOException
{
+ Configuration conf = new Configuration();
+
RingCache ring = new RingCache(keyspace, new RandomPartitioner(), seed.getHostAddress(), 9160);
List<InetAddress> privateendpoints = ring.getEndpoint(key);
List<InetAddress> endpoints = new ArrayList<InetAddress>();
Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=1183506&r1=1183505&r2=1183506&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/client/TestRingCache.java Fri Oct 14 21:22:14 2011
@@ -23,16 +23,18 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -42,11 +44,12 @@ public class TestRingCache
{
private RingCache ringCache;
private Cassandra.Client thriftClient;
+ private Configuration conf;
public TestRingCache(String keyspace) throws IOException
{
- String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
- ringCache = new RingCache(keyspace, DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, "Standard1");
+ ringCache = new RingCache(conf);
}
private void setup(String server, int port) throws Exception
@@ -58,6 +61,12 @@ public class TestRingCache
Cassandra.Client cassandraClient = new Cassandra.Client(binaryProtocol);
socket.open();
thriftClient = cassandraClient;
+ String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+ conf = new Configuration();
+ ConfigHelper.setPartitioner(conf, DatabaseDescriptor.getPartitioner().getClass().getName());
+ ConfigHelper.setInitialAddress(conf, seed);
+ ConfigHelper.setRpcPort(conf, Integer.toString(DatabaseDescriptor.getRpcPort()));
+
}
/**