You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/14 23:58:16 UTC
svn commit: r964217 - in /cassandra/branches/cassandra-0.6:
contrib/word_count/src/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/hadoop/
Author: jbellis
Date: Wed Jul 14 21:58:16 2010
New Revision: 964217
URL: http://svn.apache.org/viewvc?rev=964217&view=rev
Log:
make storage-conf.xml optional for Hadoop jobs. patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1280
Modified:
cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java (original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Wed Jul 14 21:58:16 2010
@@ -128,7 +128,8 @@ public class WordCount extends Configure
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
- ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
+ ConfigHelper.setThriftContact(conf, "localhost", 9160);
+ ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, "BytesType", "RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jul 14 21:58:16 2010
@@ -268,10 +268,9 @@ public class DatabaseDescriptor
}
try
{
- Class cls = Class.forName(partitionerClassName);
- partitioner = (IPartitioner) cls.getConstructor().newInstance();
+ partitioner = newPartitioner(partitionerClassName);
}
- catch (ClassNotFoundException e)
+ catch (Exception e)
{
throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
}
@@ -544,6 +543,22 @@ public class DatabaseDescriptor
}
}
+ public static IPartitioner newPartitioner(String partitionerClassName)
+ {
+ if (!partitionerClassName.contains("."))
+ partitionerClassName = "org.apache.cassandra.dht." + partitionerClassName;
+
+ try
+ {
+ Class cls = Class.forName(partitionerClassName);
+ return (IPartitioner) cls.getConstructor().newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Invalid partitioner class " + partitionerClassName);
+ }
+ }
+
private static void readTablesFromXml() throws ConfigurationException
{
XMLUtils xmlUtils = null;
@@ -752,9 +767,7 @@ public class DatabaseDescriptor
}
private static AbstractType getComparator(Node columnFamily, String attr) throws ConfigurationException
-// throws ConfigurationException, TransformerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException
{
- Class<? extends AbstractType> typeClass;
String compareWith = null;
try
{
@@ -766,49 +779,38 @@ public class DatabaseDescriptor
ex.initCause(e);
throw ex;
}
- if (compareWith == null)
- {
- typeClass = BytesType.class;
- }
- else
- {
- String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
- try
- {
- typeClass = (Class<? extends AbstractType>)Class.forName(className);
- }
- catch (ClassNotFoundException e)
- {
- throw new ConfigurationException("Unable to load class " + className + " for " + attr + " attribute");
- }
- }
+
try
{
- return typeClass.getConstructor().newInstance();
- }
- catch (InstantiationException e)
- {
- ConfigurationException ex = new ConfigurationException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ return getComparator(compareWith);
}
- catch (IllegalAccessException e)
+ catch (Exception e)
{
ConfigurationException ex = new ConfigurationException(e.getMessage());
ex.initCause(e);
throw ex;
}
- catch (InvocationTargetException e)
+ }
+
+ public static AbstractType getComparator(String compareWith)
+ {
+ Class<? extends AbstractType> typeClass;
+ try
{
- ConfigurationException ex = new ConfigurationException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ if (compareWith == null)
+ {
+ typeClass = BytesType.class;
+ }
+ else
+ {
+ String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
+ typeClass = (Class<? extends AbstractType>)Class.forName(className);
+ }
+ return typeClass.getConstructor().newInstance();
}
- catch (NoSuchMethodException e)
+ catch (Exception e)
{
- ConfigurationException ex = new ConfigurationException(e.getMessage());
- ex.initCause(e);
- throw ex;
+ throw new RuntimeException(e);
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Jul 14 21:58:16 2010
@@ -31,10 +31,11 @@ import java.util.concurrent.Future;
import org.apache.log4j.Logger;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TokenRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TException;
@@ -61,10 +62,9 @@ import org.apache.thrift.transport.TTran
*/
public class ColumnFamilyInputFormat extends InputFormat<String, SortedMap<byte[], IColumn>>
{
-
private static final Logger logger = Logger.getLogger(StorageService.class);
- private void validateConfiguration(Configuration conf)
+ private static void validateConfiguration(Configuration conf)
{
if (ConfigHelper.getKeyspace(conf) == null || ConfigHelper.getColumnFamily(conf) == null)
{
@@ -83,9 +83,9 @@ public class ColumnFamilyInputFormat ext
validateConfiguration(conf);
// cannonical ranges and nodes holding replicas
- List<TokenRange> masterRangeNodes = getRangeMap(ConfigHelper.getKeyspace(conf));
+ List<TokenRange> masterRangeNodes = getRangeMap(conf);
- int splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration());
+ int splitsize = ConfigHelper.getInputSplitSize(conf);
// cannonical ranges, split into pieces, fetching the splits in parallel
ExecutorService executor = Executors.newCachedThreadPool();
@@ -97,7 +97,7 @@ public class ColumnFamilyInputFormat ext
for (TokenRange range : masterRangeNodes)
{
// for each range, pick a live owner and ask it to compute bite-sized splits
- splitfutures.add(executor.submit(new SplitCallable(range, splitsize)));
+ splitfutures.add(executor.submit(new SplitCallable(range, splitsize, conf)));
}
// wait until we have all the results back
@@ -130,16 +130,17 @@ public class ColumnFamilyInputFormat ext
class SplitCallable implements Callable<List<InputSplit>>
{
- private TokenRange range;
- private int splitsize;
-
- public SplitCallable(TokenRange tr, int splitsize)
+ private final TokenRange range;
+ private final int splitsize;
+ private final Configuration conf;
+
+ public SplitCallable(TokenRange tr, int splitsize, Configuration conf)
{
this.range = tr;
this.splitsize = splitsize;
+ this.conf = conf;
}
- @Override
public List<InputSplit> call() throws Exception
{
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
@@ -161,39 +162,37 @@ public class ColumnFamilyInputFormat ext
}
return splits;
}
- }
- private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException
- {
- // TODO handle failure of range replicas & retry
- TSocket socket = new TSocket(range.endpoints.get(0),
- DatabaseDescriptor.getThriftPort());
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
- try
- {
- socket.open();
- }
- catch (TTransportException e)
- {
- throw new IOException(e);
- }
- List<String> splits;
- try
- {
- splits = client.describe_splits(range.start_token, range.end_token, splitsize);
- }
- catch (TException e)
+ private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException
{
- throw new RuntimeException(e);
+ // TODO handle failure of range replicas & retry
+ TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf));
+ TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+ Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+ try
+ {
+ socket.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new IOException(e);
+ }
+ List<String> splits;
+ try
+ {
+ splits = client.describe_splits(range.start_token, range.end_token, splitsize);
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return splits;
}
- return splits;
}
- private List<TokenRange> getRangeMap(String keyspace) throws IOException
+ private List<TokenRange> getRangeMap(Configuration conf) throws IOException
{
- TSocket socket = new TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
- DatabaseDescriptor.getThriftPort());
+ TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf));
TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
try
@@ -207,7 +206,7 @@ public class ColumnFamilyInputFormat ext
List<TokenRange> map;
try
{
- map = client.describe_ring(keyspace);
+ map = client.describe_ring(ConfigHelper.getKeyspace(conf));
}
catch (TException e)
{
@@ -220,7 +219,6 @@ public class ColumnFamilyInputFormat ext
return map;
}
- @Override
public RecordReader<String, SortedMap<byte[], IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Wed Jul 14 21:58:16 2010
@@ -29,7 +29,6 @@ import java.util.SortedMap;
import java.util.TreeMap;
import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
@@ -55,6 +54,7 @@ public class ColumnFamilyRecordReader ex
private int batchRowCount; // fetch this many per batch
private String cfName;
private String keyspace;
+ private Configuration conf;
public void close()
{
@@ -81,7 +81,7 @@ public class ColumnFamilyRecordReader ex
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
{
this.split = (ColumnFamilySplit) split;
- Configuration conf = context.getConfiguration();
+ conf = context.getConfiguration();
predicate = ConfigHelper.getSlicePredicate(conf);
totalRowCount = ConfigHelper.getInputSplitSize(conf);
batchRowCount = ConfigHelper.getRangeBatchSize(conf);
@@ -100,12 +100,13 @@ public class ColumnFamilyRecordReader ex
private class RowIterator extends AbstractIterator<Pair<String, SortedMap<byte[], IColumn>>>
{
-
private List<KeySlice> rows;
private String startToken;
private int totalRead = 0;
private int i = 0;
- private AbstractType comparator = DatabaseDescriptor.getComparator(keyspace, cfName);
+ private AbstractType comparator = ConfigHelper.getComparator(conf);
+ private AbstractType subComparator = ConfigHelper.getSubComparator(conf);
+ private IPartitioner partitioner = ConfigHelper.getPartitioner(conf);
private TSocket socket;
private void maybeInit()
@@ -120,8 +121,7 @@ public class ColumnFamilyRecordReader ex
// close previous connection if one is open
close();
- socket = new TSocket(getLocation(),
- DatabaseDescriptor.getThriftPort());
+ socket = new TSocket(getLocation(), ConfigHelper.getThriftPort(conf));
TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
Cassandra.Client client = new Cassandra.Client(binaryProtocol);
try
@@ -166,8 +166,7 @@ public class ColumnFamilyRecordReader ex
// prepare for the next slice to be read
KeySlice lastRow = rows.get(rows.size() - 1);
- IPartitioner p = DatabaseDescriptor.getPartitioner();
- startToken = p.getTokenFactory().toString(p.getToken(lastRow.getKey()));
+ startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.getKey()));
}
catch (Exception e)
{
@@ -243,28 +242,27 @@ public class ColumnFamilyRecordReader ex
socket.close();
}
}
- }
- private IColumn unthriftify(ColumnOrSuperColumn cosc)
- {
- if (cosc.column == null)
- return unthriftifySuper(cosc.super_column);
- return unthriftifySimple(cosc.column);
- }
+ private IColumn unthriftify(ColumnOrSuperColumn cosc)
+ {
+ if (cosc.column == null)
+ return unthriftifySuper(cosc.super_column);
+ return unthriftifySimple(cosc.column);
+ }
- private IColumn unthriftifySuper(SuperColumn super_column)
- {
- AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName);
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
- for (Column column : super_column.columns)
+ private IColumn unthriftifySuper(SuperColumn super_column)
{
- sc.addColumn(unthriftifySimple(column));
+ org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
+ for (Column column : super_column.columns)
+ {
+ sc.addColumn(unthriftifySimple(column));
+ }
+ return sc;
}
- return sc;
- }
- private IColumn unthriftifySimple(Column column)
- {
- return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+ private IColumn unthriftifySimple(Column column)
+ {
+ return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+ }
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Wed Jul 14 21:58:16 2010
@@ -21,6 +21,10 @@ package org.apache.cassandra.hadoop;
*/
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.ThriftValidation;
@@ -37,14 +41,36 @@ public class ConfigHelper
private static final String COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
- private static final int DEFAULT_SPLIT_SIZE = 64*1024;
+ private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
+ private static final String THRIFT_PORT = "cassandra.thrift.port";
+ private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
+ private static final String COMPARATOR = "cassandra.input.comparator";
+ private static final String SUB_COMPARATOR = "cassandra.input.subcomparator";
+ private static final String PARTITIONER = "cassandra.partitioner";
+
+ /**
+ * Set the keyspace, column family, column comparator, and row partitioner for this job.
+ *
+ * @param conf Job configuration you are about to run
+ * @param keyspace
+ * @param columnFamily
+ * @param comparator
+ * @param partitioner
+ */
+ public static void setColumnFamily(Configuration conf, String keyspace, String columnFamily, String comparator, String partitioner)
+ {
+ setColumnFamily(conf, keyspace, columnFamily);
+ conf.set(COMPARATOR, comparator);
+ conf.set(PARTITIONER, partitioner);
+ }
/**
* Set the keyspace and column family for this job.
+ * Comparator and Partitioner types will be read from storage-conf.xml.
*
- * @param conf Job configuration you are about to run
+ * @param conf Job configuration you are about to run
* @param keyspace
* @param columnFamily
*/
@@ -71,12 +97,39 @@ public class ConfigHelper
}
/**
+ * Set the subcomparator to use in the configured ColumnFamily [of SuperColumns].
+ * Optional when storage-conf.xml is provided.
+ *
+ * @param conf
+ * @param subComparator
+ */
+ public static void setSubComparator(Configuration conf, String subComparator)
+ {
+ conf.set(SUB_COMPARATOR, subComparator);
+ }
+
+ /**
+ * The address and port of a Cassandra node that Hadoop can contact over Thrift
+ * to learn more about the Cassandra cluster. Optional when storage-conf.xml
+ * is provided.
+ *
+ * @param conf
+ * @param address
+ * @param port
+ */
+ public static void setThriftContact(Configuration conf, String address, int port)
+ {
+ conf.set(THRIFT_PORT, String.valueOf(port));
+ conf.set(INITIAL_THRIFT_ADDRESS, address);
+ }
+
+ /**
* The number of rows to request with each get range slices request.
* Too big and you can either get timeouts when it takes Cassandra too
* long to fetch all the data. Too small and the performance
- * will be eaten up by the overhead of each request.
+ * will be eaten up by the overhead of each request.
*
- * @param conf Job configuration you are about to run
+ * @param conf Job configuration you are about to run
* @param batchsize Number of rows to request each time
*/
public static void setRangeBatchSize(Configuration conf, int batchsize)
@@ -88,7 +141,7 @@ public class ConfigHelper
* The number of rows to request with each get range slices request.
* Too big and you can either get timeouts when it takes Cassandra too
* long to fetch all the data. Too small and the performance
- * will be eaten up by the overhead of each request.
+ * will be eaten up by the overhead of each request.
*
* @param conf Job configuration you are about to run
* @return Number of rows to request each time
@@ -97,13 +150,13 @@ public class ConfigHelper
{
return conf.getInt(RANGE_BATCH_SIZE_CONFIG, DEFAULT_RANGE_BATCH_SIZE);
}
-
+
/**
* Set the size of the input split.
* This affects the number of maps created, if the number is too small
* the overhead of each map will take up the bulk of the job time.
*
- * @param conf Job configuration you are about to run
+ * @param conf Job configuration you are about to run
* @param splitsize Size of the input split
*/
public static void setInputSplitSize(Configuration conf, int splitsize)
@@ -119,7 +172,7 @@ public class ConfigHelper
/**
* Set the predicate that determines what columns will be selected from each row.
*
- * @param conf Job configuration you are about to run
+ * @param conf Job configuration you are about to run
* @param predicate
*/
public static void setSlicePredicate(Configuration conf, SlicePredicate predicate)
@@ -172,4 +225,38 @@ public class ConfigHelper
{
return conf.get(COLUMNFAMILY_CONFIG);
}
+
+ public static int getThriftPort(Configuration conf)
+ {
+ String v = conf.get(THRIFT_PORT);
+ return v == null ? DatabaseDescriptor.getThriftPort() : Integer.valueOf(v);
+ }
+
+ public static String getInitialAddress(Configuration conf)
+ {
+ String v = conf.get(INITIAL_THRIFT_ADDRESS);
+ return v == null ? DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
+ }
+
+ public static AbstractType getComparator(Configuration conf)
+ {
+ String v = conf.get(COMPARATOR);
+ return v == null
+ ? DatabaseDescriptor.getComparator(getKeyspace(conf), getColumnFamily(conf))
+ : DatabaseDescriptor.getComparator(v);
+ }
+
+ public static AbstractType getSubComparator(Configuration conf)
+ {
+ String v = conf.get(SUB_COMPARATOR);
+ return v == null
+ ? DatabaseDescriptor.getSubComparator(getKeyspace(conf), getColumnFamily(conf))
+ : DatabaseDescriptor.getComparator(v);
+ }
+
+ public static IPartitioner getPartitioner(Configuration conf)
+ {
+ String v = conf.get(PARTITIONER);
+ return v == null ? DatabaseDescriptor.getPartitioner() : DatabaseDescriptor.newPartitioner(v);
+ }
}