You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/26 23:01:36 UTC
svn commit: r979440 - in /cassandra/trunk: contrib/word_count/src/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/hadoop/
Author: jbellis
Date: Mon Jul 26 21:01:36 2010
New Revision: 979440
URL: http://svn.apache.org/viewvc?rev=979440&view=rev
Log:
finish CASSANDRA-1280 for trunk (lots of new code that the merge from 0.6 didn't finish). bonus: contrib/word_count actually tested and working again. patch by jbellis
Modified:
cassandra/trunk/contrib/word_count/src/WordCount.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Modified: cassandra/trunk/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Jul 26 21:01:36 2010
@@ -54,7 +54,6 @@ public class WordCount extends Configure
static final String COLUMN_FAMILY = "Standard1";
private static final String CONF_COLUMN_NAME = "columnname";
private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
- static final int RING_DELAY = 3000; // this is enough for testing a single server node; may need more for a real cluster
public static void main(String[] args) throws Exception
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Jul 26 21:01:36 2010
@@ -113,9 +113,8 @@ public class DatabaseDescriptor
{
try
{
-
configFileName = getStorageConfigPath();
-
+
if (logger.isDebugEnabled())
logger.info("Loading settings from " + configFileName);
@@ -647,7 +646,6 @@ public class DatabaseDescriptor
public static AbstractType getComparator(String compareWith) throws ConfigurationException
{
- logger.info(compareWith);
Class<? extends AbstractType> typeClass;
if (compareWith == null)
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Mon Jul 26 21:01:36 2010
@@ -41,7 +41,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
/**
@@ -170,18 +173,8 @@ public class ColumnFamilyInputFormat ext
private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
// TODO handle failure of range replicas & retry
- TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf));
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ Cassandra.Client client = createConnection(range.endpoints.get(0), ConfigHelper.getRpcPort(conf), true);
int splitsize = ConfigHelper.getInputSplitSize(conf);
- try
- {
- socket.open();
- }
- catch (TTransportException e)
- {
- throw new IOException(e);
- }
List<String> splits;
try
{
@@ -194,19 +187,25 @@ public class ColumnFamilyInputFormat ext
return splits;
}
- private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+ private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws IOException
{
- TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf));
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ TSocket socket = new TSocket(host, port);
+ TTransport trans = framed ? new TFramedTransport(socket) : socket;
try
{
- socket.open();
+ trans.open();
}
catch (TTransportException e)
{
- throw new IOException(e);
+ throw new IOException("unable to connect to server", e);
}
+ return new Cassandra.Client(new TBinaryProtocol(trans));
+ }
+
+ private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+ {
+ Cassandra.Client client = createConnection(ConfigHelper.getInitialAddress(conf), ConfigHelper.getRpcPort(conf), true);
+
List<TokenRange> map;
try
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Mon Jul 26 21:01:36 2010
@@ -26,9 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.thrift.AuthenticationException;
@@ -44,6 +42,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,16 +150,18 @@ public class ColumnFamilyOutputFormat ex
public static Cassandra.Client createAuthenticatedClient(TSocket socket, JobContext context)
throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
{
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
socket.open();
client.set_keyspace(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
- creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
- AuthenticationRequest authRequest = new AuthenticationRequest(creds);
- if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+ if (ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()) != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
+ creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
client.login(authRequest);
+ }
return client;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Mon Jul 26 21:01:36 2010
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.*;
@@ -48,6 +49,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
public class ColumnFamilyRecordReader extends RecordReader<byte[], SortedMap<byte[], IColumn>>
@@ -60,8 +62,6 @@ public class ColumnFamilyRecordReader ex
private int batchRowCount; // fetch this many per batch
private String cfName;
private String keyspace;
- private Configuration conf;
- private AuthenticationRequest authRequest;
private TSocket socket;
private Cassandra.Client client;
@@ -94,18 +94,42 @@ public class ColumnFamilyRecordReader ex
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
- conf = context.getConfiguration();
+ Configuration conf = context.getConfiguration();
predicate = ConfigHelper.getInputSlicePredicate(conf);
totalRowCount = ConfigHelper.getInputSplitSize(conf);
batchRowCount = ConfigHelper.getRangeBatchSize(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
keyspace = ConfigHelper.getInputKeyspace(conf);
- Map<String, String> creds = new HashMap<String, String>();
- creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
- creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
- authRequest = new AuthenticationRequest(creds);
-
+ try
+ {
+ // only need to connect once
+ if (socket != null && socket.isOpen())
+ return;
+
+ // create connection using thrift
+ String location = getLocation();
+ socket = new TSocket(location, ConfigHelper.getRpcPort(conf));
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
+ client = new Cassandra.Client(binaryProtocol);
+ socket.open();
+
+ // log in
+ client.set_keyspace(keyspace);
+ if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
+ {
+ Map<String, String> creds = new HashMap<String, String>();
+ creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+ creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+ AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+ client.login(authRequest);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
iter = new RowIterator();
}
@@ -117,6 +141,41 @@ public class ColumnFamilyRecordReader ex
return true;
}
+ // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+ // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
+ private String getLocation()
+ {
+ InetAddress[] localAddresses;
+ try
+ {
+ localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ for (InetAddress address : localAddresses)
+ {
+ for (String location : split.getLocations())
+ {
+ InetAddress locationAddress = null;
+ try
+ {
+ locationAddress = InetAddress.getByName(location);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ if (address.equals(locationAddress))
+ {
+ return location;
+ }
+ }
+ }
+ return split.getLocations()[0];
+ }
+
private class RowIterator extends AbstractIterator<Pair<byte[], SortedMap<byte[], IColumn>>>
{
private List<KeySlice> rows;
@@ -134,7 +193,7 @@ public class ColumnFamilyRecordReader ex
partitioner = FBUtilities.newPartitioner(client.describe_partitioner());
Map<String, String> info = client.describe_keyspace(keyspace).get(cfName);
comparator = FBUtilities.getComparator(info.get("CompareWith"));
- subComparator = FBUtilities.getComparator(info.get("CompareSubcolumnsWith"));
+ subComparator = info.get("CompareSubcolumnsWith") == null ? null : FBUtilities.getComparator(info.get("CompareSubcolumnsWith"));
}
catch (ConfigurationException e)
{
@@ -158,16 +217,7 @@ public class ColumnFamilyRecordReader ex
if (rows != null)
return;
-
- try
- {
- maybeConnect();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
+
if (startToken == null)
{
startToken = split.getStartToken();
@@ -208,68 +258,6 @@ public class ColumnFamilyRecordReader ex
throw new RuntimeException(e);
}
}
-
- /**
- * Connect, log in and set up the correct comparator.
- */
- private void maybeConnect() throws InvalidRequestException, TException, AuthenticationException,
- AuthorizationException, NotFoundException, InstantiationException, IllegalAccessException,
- ClassNotFoundException, NoSuchFieldException
- {
- // only need to connect once
- if (socket != null && socket.isOpen())
- return;
-
- // create connection using thrift
- String location = getLocation();
- socket = new TSocket(location, DatabaseDescriptor.getRpcPort());
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
- client = new Cassandra.Client(binaryProtocol);
- socket.open();
-
- // log in
- client.set_keyspace(keyspace);
- if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
- {
- client.login(authRequest);
- }
- }
-
-
- // we don't use endpointsnitch since we are trying to support hadoop nodes that are
- // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least.
- private String getLocation()
- {
- InetAddress[] localAddresses = new InetAddress[0];
- try
- {
- localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
- for (InetAddress address : localAddresses)
- {
- for (String location : split.getLocations())
- {
- InetAddress locationAddress = null;
- try
- {
- locationAddress = InetAddress.getByName(location);
- }
- catch (UnknownHostException e)
- {
- throw new AssertionError(e);
- }
- if (address.equals(locationAddress))
- {
- return location;
- }
- }
- }
- return split.getLocations()[0];
- }
/**
* @return total number of rows read by this record reader
@@ -306,8 +294,8 @@ public class ColumnFamilyRecordReader ex
private IColumn unthriftifySuper(SuperColumn super_column)
{
- ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName);
- AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName);
+ ClockType clockType = ClockType.Timestamp; // TODO generalize
+ AbstractReconciler reconciler = new TimestampReconciler(); // TODO generalize
org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler);
for (Column column : super_column.columns)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=979440&r1=979439&r2=979440&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Jul 26 21:01:36 2010
@@ -242,7 +242,7 @@ public class ConfigHelper
return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
}
- public static int getThriftPort(Configuration conf)
+ public static int getRpcPort(Configuration conf)
{
String v = conf.get(THRIFT_PORT);
return v == null ? DatabaseDescriptor.getRpcPort() : Integer.valueOf(v);