You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/27 05:59:28 UTC
svn commit: r916910 - in /incubator/cassandra/branches/cassandra-0.6:
contrib/bmt_example/CassandraBulkLoader.java
src/java/org/apache/cassandra/db/RowMutation.java
Author: jbellis
Date: Sat Feb 27 04:59:27 2010
New Revision: 916910
URL: http://svn.apache.org/viewvc?rev=916910&view=rev
Log:
fix bulk loader build. (untested, but compiling is an improvement over not compiling). patch by jbellis
Modified:
incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java
Modified: incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java?rev=916910&r1=916909&r2=916910&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java Sat Feb 27 04:59:27 2010
@@ -63,7 +63,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.SelectorManager;
import org.apache.cassandra.service.StorageService;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
@@ -102,7 +101,14 @@
System.setProperty("storage-config",cassConfig);
- StorageService.instance().startClient();
+ try
+ {
+ StorageService.instance.initClient();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
try
{
Thread.sleep(10*1000);
@@ -137,21 +143,21 @@
{
throw new RuntimeException(e);
}
- StorageService.instance().stopClient();
+ StorageService.instance.stopClient();
}
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
ColumnFamily columnFamily;
- String Keyspace = "Keyspace1";
- String CFName = "Super1";
+ String keyspace = "Keyspace1";
+ String cfName = "Super1";
Message message;
List<ColumnFamily> columnFamilies;
columnFamilies = new LinkedList<ColumnFamily>();
String line;
/* Create a column family */
- columnFamily = ColumnFamily.create(Keyspace, CFName);
+ columnFamily = ColumnFamily.create(keyspace, cfName);
while (values.hasNext()) {
// Split the value (line based on your own delimiter)
line = values.next().toString();
@@ -160,17 +166,17 @@
String ColumnName = fields[2];
String ColumnValue = fields[3];
int timestamp = 0;
- columnFamily.addColumn(new QueryPath(CFName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);
+ columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);
}
columnFamilies.add(columnFamily);
/* Get serialized message to send to cluster */
- message = createMessage(Keyspace, key.toString(), CFName, columnFamilies);
- for (InetAddress endpoint: StorageService.instance().getNaturalEndPoints(key.toString()))
+ message = createMessage(keyspace, key.toString(), cfName, columnFamilies);
+ for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.toString()))
{
/* Send message to end point */
- MessagingService.instance().sendOneWay(message, endpoint);
+ MessagingService.instance.sendOneWay(message, endpoint);
}
output.collect(key, new Text(" inserted into Cassandra node(s)"));
@@ -248,7 +254,7 @@
try
{
/* Make message */
- message = rm.makeRowMutationMessage(StorageService.binaryVerbHandler_);
+ message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
}
catch (IOException e)
{
Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java?rev=916910&r1=916909&r2=916910&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java Sat Feb 27 04:59:27 2010
@@ -211,10 +211,15 @@
public Message makeRowMutationMessage() throws IOException
{
+ return makeRowMutationMessage(StorageService.Verb.MUTATION);
+ }
+
+ public Message makeRowMutationMessage(StorageService.Verb verb) throws IOException
+ {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, StorageService.Verb.MUTATION, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, verb, bos.toByteArray());
}
public static RowMutation getRowMutationFromMutations(String keyspace, String key, Map<String, List<Mutation>> cfmap)