You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/15 18:22:46 UTC

svn commit: r1081867 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/hadoop/

Author: brandonwilliams
Date: Tue Mar 15 17:22:45 2011
New Revision: 1081867

URL: http://svn.apache.org/viewvc?rev=1081867&view=rev
Log:
Allow Hadoop jobs to set the consistency level.
Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-2331

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 15 17:22:45 2011
@@ -3,6 +3,7 @@
  * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
  * clear Built flag in system table when dropping an index (CASSANDRA-2320)
  * validate index names (CASSANDRA-1761)
+ * allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
 
 
 0.7.4

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Mar 15 17:22:45 2011
@@ -57,6 +57,7 @@ public class ColumnFamilyRecordReader ex
     private String keyspace;
     private TSocket socket;
     private Cassandra.Client client;
+    private ConsistencyLevel consistencyLevel;
 
     public void close() 
     {
@@ -92,6 +93,9 @@ public class ColumnFamilyRecordReader ex
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
         batchRowCount = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
+        
+        
         keyspace = ConfigHelper.getInputKeyspace(conf);
         
         try
@@ -238,7 +242,7 @@ public class ColumnFamilyRecordReader ex
                 rows = client.get_range_slices(new ColumnParent(cfName),
                                                predicate,
                                                keyRange,
-                                               ConsistencyLevel.ONE);
+                                               consistencyLevel);
                   
                 // nothing new? reached the end
                 if (rows.isEmpty())

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Mar 15 17:22:45 2011
@@ -78,6 +78,9 @@ implements org.apache.hadoop.mapred.Reco
     // handles for clients for each range running in the threadpool
     private final Map<Range,RangeClient> clients;
     private final long batchThreshold;
+    
+    private final ConsistencyLevel consistencyLevel;
+
 
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -101,6 +104,7 @@ implements org.apache.hadoop.mapred.Reco
         this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors());
         this.clients = new HashMap<Range,RangeClient>();
         batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
     }
 
     /**
@@ -347,7 +351,7 @@ implements org.apache.hadoop.mapred.Reco
                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
                     try
                     {
-                        thriftClient.batch_mutate(batch, ConsistencyLevel.ONE);
+                        thriftClient.batch_mutate(batch, consistencyLevel);
                         break;
                     }
                     catch (Exception e)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Tue Mar 15 17:22:45 2011
@@ -49,6 +49,8 @@ public class ConfigHelper
     private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
     private static final String THRIFT_PORT = "cassandra.thrift.port";
     private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
+    private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
+    private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
 
     /**
      * Set the keyspace and column family for the input of this job.
@@ -222,12 +224,22 @@ public class ConfigHelper
     {
         return conf.get(INPUT_COLUMNFAMILY_CONFIG);
     }
-
+    
     public static String getOutputColumnFamily(Configuration conf)
     {
         return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
     }
 
+    public static String getReadConsistencyLevel(Configuration conf)
+    {
+        return conf.get(READ_CONSISTENCY_LEVEL, "ONE");
+    }
+
+    public static String getWriteConsistencyLevel(Configuration conf)
+    {
+        return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE");
+    }
+
     public static int getRpcPort(Configuration conf)
     {
         return Integer.parseInt(conf.get(THRIFT_PORT));