You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/02 19:55:55 UTC

svn commit: r886256 - in /incubator/cassandra/trunk: contrib/bmt_example/CassandraBulkLoader.java src/java/org/apache/cassandra/service/StorageService.java

Author: jbellis
Date: Wed Dec  2 18:55:51 2009
New Revision: 886256

URL: http://svn.apache.org/viewvc?rev=886256&view=rev
Log:
update bmt_example to use new client mode.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-535

Modified:
    incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=886256&r1=886255&r2=886256&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Wed Dec  2 18:55:51 2009
@@ -34,10 +34,9 @@
   *
   * You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key>
   * in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the
-  * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. You need to
-  * know your tokens and ip addresses ahead of time.
+  * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes.
   *
-  * Author : Chris Goffinet (goffinet@digg.com)
+  * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
   */
   
 package org.apache.cassandra.bulkloader;
@@ -80,6 +79,7 @@
             output.collect(key, value);
         }
     }
+
     public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
         private Path[] localFiles;
         private ArrayList<String> tokens = new ArrayList<String>();
@@ -102,31 +102,17 @@
 
             System.setProperty("storage-config",cassConfig);
 
-            startMessagingService();
-            /* 
-              Populate tokens 
-              
-              Specify your tokens and ips below. 
-              
-              tokens.add("0:192.168.0.1")
-              tokens.add("14178431955039102644307275309657008810:192.168.0.2")
-            */
-
-            for (String token : this.tokens)
-            {
-                String[] values = token.split(":");
-                InetAddress address;
-                try
-                {
-                    address = InetAddress.getByName(values[1]);
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                StorageService.instance().updateForeignTokenUnsafe(new BigIntegerToken(new BigInteger(values[0])), address);
+            StorageService.instance().startClient();
+            try
+            {
+                Thread.sleep(10*1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
             }
         }
+
         public void close()
         {
             try
@@ -142,8 +128,18 @@
             {
                 throw new RuntimeException(e);
             }
-            shutdownMessagingService();
+            try
+            {
+                // Sleep just in case the number of keys we send over is small
+                Thread.sleep(3*1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            StorageService.instance().stopClient();
         }
+
         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
         {
             ColumnFamily columnFamily;
@@ -218,6 +214,7 @@
             throw new RuntimeException(e);
         }
     }
+
     public static Message createMessage(String Keyspace, String Key, String CFName, List<ColumnFamily> ColumnFamiles)
     {
         ColumnFamily baseColumnFamily;
@@ -260,23 +257,6 @@
 
         return message;
     }
-    public static void startMessagingService()
-    {
-        SelectorManager.getSelectorManager().start();
-    }
-    public static void shutdownMessagingService()
-    {
-        try
-        {
-            // Sleep just in case the number of keys we send over is small
-            Thread.sleep(3*1000);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        MessagingService.flushAndshutdown();
-    }
     public static void main(String[] args) throws Exception
     {
         runJob(args);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=886256&r1=886255&r2=886256&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec  2 18:55:51 2009
@@ -181,22 +181,6 @@
         logger_.info("Bootstrap completed! Now serving reads.");
     }
 
-    private void updateForeignToken(Token token, InetAddress endpoint)
-    {
-        tokenMetadata_.update(token, endpoint);
-        SystemTable.updateToken(endpoint, token);
-    }
-
-    /**
-     * Intended for operation in client-only (non-storage mode). E.g.: for bulk loading clients
-     * to be able to use tokenmetadata/messagingservice without fully starting storageservice / systemtable,
-     * or java clients that wish to bypase Thrift entirely.
-     */
-    public void updateForeignTokenUnsafe(Token token, InetAddress endpoint)
-    {
-        tokenMetadata_.update(token, endpoint);
-    }
-
     /** This method updates the local token on disk  */
     public void setToken(Token token)
     {
@@ -434,9 +418,15 @@
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state normal, token " + token);
             if (isClientMode)
-                updateForeignTokenUnsafe(token, endpoint);
+            {
+                tokenMetadata_.update(token, endpoint);
+                // do NOT update systemtable in client mode
+            }
             else
-                updateForeignToken(token, endpoint);
+            {
+                tokenMetadata_.update(token, endpoint);
+                SystemTable.updateToken(endpoint, token);
+            }
             replicationStrategy_.removeObsoletePendingRanges();
         }
         else if (STATE_LEAVING.equals(stateName))