You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/27 05:59:28 UTC

svn commit: r916910 - in /incubator/cassandra/branches/cassandra-0.6: contrib/bmt_example/CassandraBulkLoader.java src/java/org/apache/cassandra/db/RowMutation.java

Author: jbellis
Date: Sat Feb 27 04:59:27 2010
New Revision: 916910

URL: http://svn.apache.org/viewvc?rev=916910&view=rev
Log:
fix bulk loader build. (untested, but compiling is an improvement over not compiling).  patch by jbellis

Modified:
    incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java

Modified: incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java?rev=916910&r1=916909&r2=916910&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java Sat Feb 27 04:59:27 2010
@@ -63,7 +63,6 @@
 
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.SelectorManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
@@ -102,7 +101,14 @@
 
             System.setProperty("storage-config",cassConfig);
 
-            StorageService.instance().startClient();
+            try
+            {
+                StorageService.instance.initClient();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
             try
             {
                 Thread.sleep(10*1000);
@@ -137,21 +143,21 @@
             {
                 throw new RuntimeException(e);
             }
-            StorageService.instance().stopClient();
+            StorageService.instance.stopClient();
         }
 
         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
         {
             ColumnFamily columnFamily;
-            String Keyspace = "Keyspace1";
-            String CFName = "Super1";
+            String keyspace = "Keyspace1";
+            String cfName = "Super1";
             Message message;
             List<ColumnFamily> columnFamilies;
             columnFamilies = new LinkedList<ColumnFamily>();
             String line;
 
             /* Create a column family */
-            columnFamily = ColumnFamily.create(Keyspace, CFName);
+            columnFamily = ColumnFamily.create(keyspace, cfName);
             while (values.hasNext()) {
                 // Split the value (line based on your own delimiter)
                 line = values.next().toString();
@@ -160,17 +166,17 @@
                 String ColumnName = fields[2];
                 String ColumnValue = fields[3];
                 int timestamp = 0;
-                columnFamily.addColumn(new QueryPath(CFName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);
+                columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName.getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);
             }
 
             columnFamilies.add(columnFamily);
 
             /* Get serialized message to send to cluster */
-            message = createMessage(Keyspace, key.toString(), CFName, columnFamilies);
-            for (InetAddress endpoint: StorageService.instance().getNaturalEndPoints(key.toString()))
+            message = createMessage(keyspace, key.toString(), cfName, columnFamilies);
+            for (InetAddress endpoint: StorageService.instance.getNaturalEndpoints(keyspace, key.toString()))
             {
                 /* Send message to end point */
-                MessagingService.instance().sendOneWay(message, endpoint);
+                MessagingService.instance.sendOneWay(message, endpoint);
             }
             
             output.collect(key, new Text(" inserted into Cassandra node(s)"));
@@ -248,7 +254,7 @@
         try
         {
             /* Make message */
-            message = rm.makeRowMutationMessage(StorageService.binaryVerbHandler_);
+            message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
         }
         catch (IOException e)
         {

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java?rev=916910&r1=916909&r2=916910&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutation.java Sat Feb 27 04:59:27 2010
@@ -211,10 +211,15 @@
 
     public Message makeRowMutationMessage() throws IOException
     {
+        return makeRowMutationMessage(StorageService.Verb.MUTATION);
+    }
+
+    public Message makeRowMutationMessage(StorageService.Verb verb) throws IOException
+    {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, StorageService.Verb.MUTATION, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.MUTATION_STAGE, verb, bos.toByteArray());
     }
 
     public static RowMutation getRowMutationFromMutations(String keyspace, String key, Map<String, List<Mutation>> cfmap)