You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/14 23:58:16 UTC

svn commit: r964217 - in /cassandra/branches/cassandra-0.6: contrib/word_count/src/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/hadoop/

Author: jbellis
Date: Wed Jul 14 21:58:16 2010
New Revision: 964217

URL: http://svn.apache.org/viewvc?rev=964217&view=rev
Log:
make storage-conf.xml optional for Hadoop jobs.  patch by jbellis; reviewed by Jeremy Hanna for CASSANDRA-1280

Modified:
    cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java (original)
+++ cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Wed Jul 14 21:58:16 2010
@@ -128,7 +128,8 @@ public class WordCount extends Configure
             job.setInputFormatClass(ColumnFamilyInputFormat.class);
             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
 
-            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
+            ConfigHelper.setThriftContact(conf, "localhost",  9160);
+            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, "BytesType", "RandomPartitioner");
             SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
             ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Jul 14 21:58:16 2010
@@ -268,10 +268,9 @@ public class DatabaseDescriptor
             }
             try
             {
-                Class cls = Class.forName(partitionerClassName);
-                partitioner = (IPartitioner) cls.getConstructor().newInstance();
+                partitioner = newPartitioner(partitionerClassName);
             }
-            catch (ClassNotFoundException e)
+            catch (Exception e)
             {
                 throw new ConfigurationException("Invalid partitioner class " + partitionerClassName);
             }
@@ -544,6 +543,22 @@ public class DatabaseDescriptor
         }
     }
 
+    public static IPartitioner newPartitioner(String partitionerClassName)
+    {
+        if (!partitionerClassName.contains("."))
+            partitionerClassName = "org.apache.cassandra.dht." + partitionerClassName;
+
+        try
+        {
+            Class cls = Class.forName(partitionerClassName);
+            return (IPartitioner) cls.getConstructor().newInstance();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Invalid partitioner class " + partitionerClassName);
+        }
+    }
+
     private static void readTablesFromXml() throws ConfigurationException
     {
         XMLUtils xmlUtils = null;
@@ -752,9 +767,7 @@ public class DatabaseDescriptor
     }
 
     private static AbstractType getComparator(Node columnFamily, String attr) throws ConfigurationException
-//    throws ConfigurationException, TransformerException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException
     {
-        Class<? extends AbstractType> typeClass;
         String compareWith = null;
         try
         {
@@ -766,49 +779,38 @@ public class DatabaseDescriptor
             ex.initCause(e);
             throw ex;
         }
-        if (compareWith == null)
-        {
-            typeClass = BytesType.class;
-        }
-        else
-        {
-            String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
-            try
-            {
-                typeClass = (Class<? extends AbstractType>)Class.forName(className);
-            }
-            catch (ClassNotFoundException e)
-            {
-                throw new ConfigurationException("Unable to load class " + className + " for " + attr + " attribute");
-            }
-        }
+
         try
         {
-        return typeClass.getConstructor().newInstance();
-    }
-        catch (InstantiationException e)
-        {
-            ConfigurationException ex = new ConfigurationException(e.getMessage());
-            ex.initCause(e);
-            throw ex;
+            return getComparator(compareWith);
         }
-        catch (IllegalAccessException e)
+        catch (Exception e)
         {
             ConfigurationException ex = new ConfigurationException(e.getMessage());
             ex.initCause(e);
             throw ex;
         }
-        catch (InvocationTargetException e)
+    }
+
+    public static AbstractType getComparator(String compareWith)
+    {
+        Class<? extends AbstractType> typeClass;
+        try
         {
-            ConfigurationException ex = new ConfigurationException(e.getMessage());
-            ex.initCause(e);
-            throw ex;
+            if (compareWith == null)
+            {
+                typeClass = BytesType.class;
+            }
+            else
+            {
+                String className = compareWith.contains(".") ? compareWith : "org.apache.cassandra.db.marshal." + compareWith;
+                typeClass = (Class<? extends AbstractType>)Class.forName(className);
+            }
+            return typeClass.getConstructor().newInstance();
         }
-        catch (NoSuchMethodException e)
+        catch (Exception e)
         {
-            ConfigurationException ex = new ConfigurationException(e.getMessage());
-            ex.initCause(e);
-            throw ex;
+            throw new RuntimeException(e);
         }
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Wed Jul 14 21:58:16 2010
@@ -31,10 +31,11 @@ import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TokenRange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
@@ -61,10 +62,9 @@ import org.apache.thrift.transport.TTran
  */
 public class ColumnFamilyInputFormat extends InputFormat<String, SortedMap<byte[], IColumn>>
 {
-
     private static final Logger logger = Logger.getLogger(StorageService.class);
 
-    private void validateConfiguration(Configuration conf)
+    private static void validateConfiguration(Configuration conf)
     {
         if (ConfigHelper.getKeyspace(conf) == null || ConfigHelper.getColumnFamily(conf) == null)
         {
@@ -83,9 +83,9 @@ public class ColumnFamilyInputFormat ext
         validateConfiguration(conf);
 
         // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap(ConfigHelper.getKeyspace(conf));
+        List<TokenRange> masterRangeNodes = getRangeMap(conf);
 
-        int splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration());
+        int splitsize = ConfigHelper.getInputSplitSize(conf);
         
         // cannonical ranges, split into pieces, fetching the splits in parallel 
         ExecutorService executor = Executors.newCachedThreadPool();
@@ -97,7 +97,7 @@ public class ColumnFamilyInputFormat ext
             for (TokenRange range : masterRangeNodes)
             {
                 // for each range, pick a live owner and ask it to compute bite-sized splits
-                splitfutures.add(executor.submit(new SplitCallable(range, splitsize)));
+                splitfutures.add(executor.submit(new SplitCallable(range, splitsize, conf)));
             }
     
             // wait until we have all the results back
@@ -130,16 +130,17 @@ public class ColumnFamilyInputFormat ext
     class SplitCallable implements Callable<List<InputSplit>>
     {
 
-        private TokenRange range;
-        private int splitsize;
-        
-        public SplitCallable(TokenRange tr, int splitsize)
+        private final TokenRange range;
+        private final int splitsize;
+        private final Configuration conf;
+
+        public SplitCallable(TokenRange tr, int splitsize, Configuration conf)
         {
             this.range = tr;
             this.splitsize = splitsize;
+            this.conf = conf;
         }
 
-        @Override
         public List<InputSplit> call() throws Exception
         {
             ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
@@ -161,39 +162,37 @@ public class ColumnFamilyInputFormat ext
             }
             return splits;
         }
-    }
 
-    private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException
-    {
-        // TODO handle failure of range replicas & retry
-        TSocket socket = new TSocket(range.endpoints.get(0),
-                                     DatabaseDescriptor.getThriftPort());
-        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        try
-        {
-            socket.open();
-        }
-        catch (TTransportException e)
-        {
-            throw new IOException(e);
-        }
-        List<String> splits;
-        try
-        {
-            splits = client.describe_splits(range.start_token, range.end_token, splitsize);
-        }
-        catch (TException e)
+        private List<String> getSubSplits(TokenRange range, int splitsize) throws IOException
         {
-            throw new RuntimeException(e);
+            // TODO handle failure of range replicas & retry
+            TSocket socket = new TSocket(range.endpoints.get(0), ConfigHelper.getThriftPort(conf));
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+            Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+            try
+            {
+                socket.open();
+            }
+            catch (TTransportException e)
+            {
+                throw new IOException(e);
+            }
+            List<String> splits;
+            try
+            {
+                splits = client.describe_splits(range.start_token, range.end_token, splitsize);
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return splits;
         }
-        return splits;
     }
 
-    private List<TokenRange> getRangeMap(String keyspace) throws IOException
+    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
     {
-        TSocket socket = new TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
-                                     DatabaseDescriptor.getThriftPort());
+        TSocket socket = new TSocket(ConfigHelper.getInitialAddress(conf), ConfigHelper.getThriftPort(conf));
         TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         try
@@ -207,7 +206,7 @@ public class ColumnFamilyInputFormat ext
         List<TokenRange> map;
         try
         {
-            map = client.describe_ring(keyspace);
+            map = client.describe_ring(ConfigHelper.getKeyspace(conf));
         }
         catch (TException e)
         {
@@ -220,7 +219,6 @@ public class ColumnFamilyInputFormat ext
         return map;
     }
 
-    @Override
     public RecordReader<String, SortedMap<byte[], IColumn>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Wed Jul 14 21:58:16 2010
@@ -29,7 +29,6 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
@@ -55,6 +54,7 @@ public class ColumnFamilyRecordReader ex
     private int batchRowCount; // fetch this many per batch
     private String cfName;
     private String keyspace;
+    private Configuration conf;
 
     public void close() 
     {
@@ -81,7 +81,7 @@ public class ColumnFamilyRecordReader ex
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
-        Configuration conf = context.getConfiguration();
+        conf = context.getConfiguration();
         predicate = ConfigHelper.getSlicePredicate(conf);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
         batchRowCount = ConfigHelper.getRangeBatchSize(conf);
@@ -100,12 +100,13 @@ public class ColumnFamilyRecordReader ex
 
     private class RowIterator extends AbstractIterator<Pair<String, SortedMap<byte[], IColumn>>>
     {
-
         private List<KeySlice> rows;
         private String startToken;
         private int totalRead = 0;
         private int i = 0;
-        private AbstractType comparator = DatabaseDescriptor.getComparator(keyspace, cfName);
+        private AbstractType comparator = ConfigHelper.getComparator(conf);
+        private AbstractType subComparator = ConfigHelper.getSubComparator(conf);
+        private IPartitioner partitioner = ConfigHelper.getPartitioner(conf);
         private TSocket socket;
 
         private void maybeInit()
@@ -120,8 +121,7 @@ public class ColumnFamilyRecordReader ex
             // close previous connection if one is open
             close();
             
-            socket = new TSocket(getLocation(),
-                                         DatabaseDescriptor.getThriftPort());
+            socket = new TSocket(getLocation(), ConfigHelper.getThriftPort(conf));
             TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
             Cassandra.Client client = new Cassandra.Client(binaryProtocol);
             try
@@ -166,8 +166,7 @@ public class ColumnFamilyRecordReader ex
                 
                 // prepare for the next slice to be read
                 KeySlice lastRow = rows.get(rows.size() - 1);
-                IPartitioner p = DatabaseDescriptor.getPartitioner();
-                startToken = p.getTokenFactory().toString(p.getToken(lastRow.getKey()));
+                startToken = partitioner.getTokenFactory().toString(partitioner.getToken(lastRow.getKey()));
             }
             catch (Exception e)
             {
@@ -243,28 +242,27 @@ public class ColumnFamilyRecordReader ex
                 socket.close();
             }
         }
-    }
 
-    private IColumn unthriftify(ColumnOrSuperColumn cosc)
-    {
-        if (cosc.column == null)
-            return unthriftifySuper(cosc.super_column);
-        return unthriftifySimple(cosc.column);
-    }
+        private IColumn unthriftify(ColumnOrSuperColumn cosc)
+        {
+            if (cosc.column == null)
+                return unthriftifySuper(cosc.super_column);
+            return unthriftifySimple(cosc.column);
+        }
 
-    private IColumn unthriftifySuper(SuperColumn super_column)
-    {
-        AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName);
-        org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
-        for (Column column : super_column.columns)
+        private IColumn unthriftifySuper(SuperColumn super_column)
         {
-            sc.addColumn(unthriftifySimple(column));
+            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator);
+            for (Column column : super_column.columns)
+            {
+                sc.addColumn(unthriftifySimple(column));
+            }
+            return sc;
         }
-        return sc;
-    }
 
-    private IColumn unthriftifySimple(Column column)
-    {
-        return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+        private IColumn unthriftifySimple(Column column)
+        {
+            return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+        }
     }
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=964217&r1=964216&r2=964217&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Wed Jul 14 21:58:16 2010
@@ -21,6 +21,10 @@ package org.apache.cassandra.hadoop;
  */
 
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.ThriftValidation;
@@ -37,14 +41,36 @@ public class ConfigHelper
     private static final String COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
     private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
     private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
-    private static final int DEFAULT_SPLIT_SIZE = 64*1024;
+    private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
     private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
     private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
+    private static final String THRIFT_PORT = "cassandra.thrift.port";
+    private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
+    private static final String COMPARATOR = "cassandra.input.comparator";
+    private static final String SUB_COMPARATOR = "cassandra.input.subcomparator";
+    private static final String PARTITIONER = "cassandra.partitioner";
+
+    /**
+     * Set the keyspace, column family, column comparator, and row partitioner for this job.
+     *
+     * @param conf         Job configuration you are about to run
+     * @param keyspace
+     * @param columnFamily
+     * @param comparator
+     * @param partitioner
+     */
+    public static void setColumnFamily(Configuration conf, String keyspace, String columnFamily, String comparator, String partitioner)
+    {
+        setColumnFamily(conf, keyspace, columnFamily);
+        conf.set(COMPARATOR, comparator);
+        conf.set(PARTITIONER, partitioner);
+    }
 
     /**
      * Set the keyspace and column family for this job.
+     * Comparator and Partitioner types will be read from storage-conf.xml.
      *
-     * @param conf Job configuration you are about to run
+     * @param conf         Job configuration you are about to run
      * @param keyspace
      * @param columnFamily
      */
@@ -71,12 +97,39 @@ public class ConfigHelper
     }
 
     /**
+     * Set the subcomparator to use in the configured ColumnFamily [of SuperColumns].
+     * Optional when storage-conf.xml is provided.
+     *
+     * @param conf
+     * @param subComparator
+     */
+    public static void setSubComparator(Configuration conf, String subComparator)
+    {
+        conf.set(SUB_COMPARATOR, subComparator);
+    }
+
+    /**
+     * The address and port of a Cassandra node that Hadoop can contact over Thrift
+     * to learn more about the Cassandra cluster.  Optional when storage-conf.xml
+     * is provided.
+     *
+     * @param conf
+     * @param address
+     * @param port
+     */
+    public static void setThriftContact(Configuration conf, String address, int port)
+    {
+        conf.set(THRIFT_PORT, String.valueOf(port));
+        conf.set(INITIAL_THRIFT_ADDRESS, address);
+    }
+
+    /**
      * The number of rows to request with each get range slices request.
      * Too big and you can either get timeouts when it takes Cassandra too
      * long to fetch all the data. Too small and the performance
-     * will be eaten up by the overhead of each request. 
+     * will be eaten up by the overhead of each request.
      *
-     * @param conf Job configuration you are about to run
+     * @param conf      Job configuration you are about to run
      * @param batchsize Number of rows to request each time
      */
     public static void setRangeBatchSize(Configuration conf, int batchsize)
@@ -88,7 +141,7 @@ public class ConfigHelper
      * The number of rows to request with each get range slices request.
      * Too big and you can either get timeouts when it takes Cassandra too
      * long to fetch all the data. Too small and the performance
-     * will be eaten up by the overhead of each request. 
+     * will be eaten up by the overhead of each request.
      *
      * @param conf Job configuration you are about to run
      * @return Number of rows to request each time
@@ -97,13 +150,13 @@ public class ConfigHelper
     {
         return conf.getInt(RANGE_BATCH_SIZE_CONFIG, DEFAULT_RANGE_BATCH_SIZE);
     }
-    
+
     /**
      * Set the size of the input split.
      * This affects the number of maps created, if the number is too small
      * the overhead of each map will take up the bulk of the job time.
      *
-     * @param conf Job configuration you are about to run
+     * @param conf      Job configuration you are about to run
      * @param splitsize Size of the input split
      */
     public static void setInputSplitSize(Configuration conf, int splitsize)
@@ -119,7 +172,7 @@ public class ConfigHelper
     /**
      * Set the predicate that determines what columns will be selected from each row.
      *
-     * @param conf Job configuration you are about to run
+     * @param conf      Job configuration you are about to run
      * @param predicate
      */
     public static void setSlicePredicate(Configuration conf, SlicePredicate predicate)
@@ -172,4 +225,38 @@ public class ConfigHelper
     {
         return conf.get(COLUMNFAMILY_CONFIG);
     }
+
+    public static int getThriftPort(Configuration conf)
+    {
+        String v = conf.get(THRIFT_PORT);
+        return v == null ? DatabaseDescriptor.getThriftPort() : Integer.valueOf(v);
+    }
+
+    public static String getInitialAddress(Configuration conf)
+    {
+        String v = conf.get(INITIAL_THRIFT_ADDRESS);
+        return v == null ? DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
+    }
+
+    public static AbstractType getComparator(Configuration conf)
+    {
+        String v = conf.get(COMPARATOR);
+        return v == null
+               ? DatabaseDescriptor.getComparator(getKeyspace(conf), getColumnFamily(conf))
+               : DatabaseDescriptor.getComparator(v);
+    }
+
+    public static AbstractType getSubComparator(Configuration conf)
+    {
+        String v = conf.get(SUB_COMPARATOR);
+        return v == null
+               ? DatabaseDescriptor.getSubComparator(getKeyspace(conf), getColumnFamily(conf))
+               : DatabaseDescriptor.getComparator(v);
+    }
+
+    public static IPartitioner getPartitioner(Configuration conf)
+    {
+        String v = conf.get(PARTITIONER);
+        return v == null ? DatabaseDescriptor.getPartitioner() : DatabaseDescriptor.newPartitioner(v);
+    }
 }