You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/03/15 18:22:46 UTC
svn commit: r1081867 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/hadoop/
Author: brandonwilliams
Date: Tue Mar 15 17:22:45 2011
New Revision: 1081867
URL: http://svn.apache.org/viewvc?rev=1081867&view=rev
Log:
Allow Hadoop jobs to set the consistency level.
Patch by Eldon Stegall, reviewed by brandonwilliams for CASSANDRA-2331
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Mar 15 17:22:45 2011
@@ -3,6 +3,7 @@
* fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
* clear Built flag in system table when dropping an index (CASSANDRA-2320)
* validate index names (CASSANDRA-1761)
+ * allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
0.7.4
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Mar 15 17:22:45 2011
@@ -57,6 +57,7 @@ public class ColumnFamilyRecordReader ex
private String keyspace;
private TSocket socket;
private Cassandra.Client client;
+ private ConsistencyLevel consistencyLevel;
public void close()
{
@@ -92,6 +93,9 @@ public class ColumnFamilyRecordReader ex
totalRowCount = ConfigHelper.getInputSplitSize(conf);
batchRowCount = ConfigHelper.getRangeBatchSize(conf);
cfName = ConfigHelper.getInputColumnFamily(conf);
+ consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf));
+
+
keyspace = ConfigHelper.getInputKeyspace(conf);
try
@@ -238,7 +242,7 @@ public class ColumnFamilyRecordReader ex
rows = client.get_range_slices(new ColumnParent(cfName),
predicate,
keyRange,
- ConsistencyLevel.ONE);
+ consistencyLevel);
// nothing new? reached the end
if (rows.isEmpty())
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Tue Mar 15 17:22:45 2011
@@ -78,6 +78,9 @@ implements org.apache.hadoop.mapred.Reco
// handles for clients for each range running in the threadpool
private final Map<Range,RangeClient> clients;
private final long batchThreshold;
+
+ private final ConsistencyLevel consistencyLevel;
+
/**
* Upon construction, obtain the map that this writer will use to collect
@@ -101,6 +104,7 @@ implements org.apache.hadoop.mapred.Reco
this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * Runtime.getRuntime().availableProcessors());
this.clients = new HashMap<Range,RangeClient>();
batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+ consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
}
/**
@@ -347,7 +351,7 @@ implements org.apache.hadoop.mapred.Reco
// send the mutation to the last-used endpoint. first time through, this will NPE harmlessly.
try
{
- thriftClient.batch_mutate(batch, ConsistencyLevel.ONE);
+ thriftClient.batch_mutate(batch, consistencyLevel);
break;
}
catch (Exception e)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1081867&r1=1081866&r2=1081867&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Tue Mar 15 17:22:45 2011
@@ -49,6 +49,8 @@ public class ConfigHelper
private static final int DEFAULT_RANGE_BATCH_SIZE = 4096;
private static final String THRIFT_PORT = "cassandra.thrift.port";
private static final String INITIAL_THRIFT_ADDRESS = "cassandra.thrift.address";
+ private static final String READ_CONSISTENCY_LEVEL = "cassandra.consistencylevel.read";
+ private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
/**
* Set the keyspace and column family for the input of this job.
@@ -222,12 +224,22 @@ public class ConfigHelper
{
return conf.get(INPUT_COLUMNFAMILY_CONFIG);
}
-
+
public static String getOutputColumnFamily(Configuration conf)
{
return conf.get(OUTPUT_COLUMNFAMILY_CONFIG);
}
+ public static String getReadConsistencyLevel(Configuration conf)
+ {
+ return conf.get(READ_CONSISTENCY_LEVEL, "ONE");
+ }
+
+ public static String getWriteConsistencyLevel(Configuration conf)
+ {
+ return conf.get(WRITE_CONSISTENCY_LEVEL, "ONE");
+ }
+
public static int getRpcPort(Configuration conf)
{
return Integer.parseInt(conf.get(THRIFT_PORT));