You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/02 19:55:55 UTC
svn commit: r886256 - in /incubator/cassandra/trunk:
contrib/bmt_example/CassandraBulkLoader.java
src/java/org/apache/cassandra/service/StorageService.java
Author: jbellis
Date: Wed Dec 2 18:55:51 2009
New Revision: 886256
URL: http://svn.apache.org/viewvc?rev=886256&view=rev
Log:
update bmt_example to use new client mode.
patch by jbellis; reviewed by gdusbabek for CASSANDRA-535
Modified:
incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=886256&r1=886255&r2=886256&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ incubator/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Wed Dec 2 18:55:51 2009
@@ -34,10 +34,9 @@
*
* You should construct your data you want to import as rows delimited by a new line. You end up grouping by <Key>
* in the mapper, so that the end result generates the data set into a column oriented subset. Once you get to the
- * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes. You need to
- * know your tokens and ip addresses ahead of time.
+ * reduce aspect, you can generate the ColumnFamilies you want inserted, and send it to your nodes.
*
- * Author : Chris Goffinet (goffinet@digg.com)
+ * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
*/
package org.apache.cassandra.bulkloader;
@@ -80,6 +79,7 @@
output.collect(key, value);
}
}
+
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private Path[] localFiles;
private ArrayList<String> tokens = new ArrayList<String>();
@@ -102,31 +102,17 @@
System.setProperty("storage-config",cassConfig);
- startMessagingService();
- /*
- Populate tokens
-
- Specify your tokens and ips below.
-
- tokens.add("0:192.168.0.1")
- tokens.add("14178431955039102644307275309657008810:192.168.0.2")
- */
-
- for (String token : this.tokens)
- {
- String[] values = token.split(":");
- InetAddress address;
- try
- {
- address = InetAddress.getByName(values[1]);
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- StorageService.instance().updateForeignTokenUnsafe(new BigIntegerToken(new BigInteger(values[0])), address);
+ StorageService.instance().startClient();
+ try
+ {
+ Thread.sleep(10*1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
}
}
+
public void close()
{
try
@@ -142,8 +128,18 @@
{
throw new RuntimeException(e);
}
- shutdownMessagingService();
+ try
+ {
+ // Sleep just in case the number of keys we send over is small
+ Thread.sleep(3*1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ StorageService.instance().stopClient();
}
+
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
ColumnFamily columnFamily;
@@ -218,6 +214,7 @@
throw new RuntimeException(e);
}
}
+
public static Message createMessage(String Keyspace, String Key, String CFName, List<ColumnFamily> ColumnFamiles)
{
ColumnFamily baseColumnFamily;
@@ -260,23 +257,6 @@
return message;
}
- public static void startMessagingService()
- {
- SelectorManager.getSelectorManager().start();
- }
- public static void shutdownMessagingService()
- {
- try
- {
- // Sleep just in case the number of keys we send over is small
- Thread.sleep(3*1000);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- MessagingService.flushAndshutdown();
- }
public static void main(String[] args) throws Exception
{
runJob(args);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=886256&r1=886255&r2=886256&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec 2 18:55:51 2009
@@ -181,22 +181,6 @@
logger_.info("Bootstrap completed! Now serving reads.");
}
- private void updateForeignToken(Token token, InetAddress endpoint)
- {
- tokenMetadata_.update(token, endpoint);
- SystemTable.updateToken(endpoint, token);
- }
-
- /**
- * Intended for operation in client-only (non-storage mode). E.g.: for bulk loading clients
- * to be able to use tokenmetadata/messagingservice without fully starting storageservice / systemtable,
- * or java clients that wish to bypase Thrift entirely.
- */
- public void updateForeignTokenUnsafe(Token token, InetAddress endpoint)
- {
- tokenMetadata_.update(token, endpoint);
- }
-
/** This method updates the local token on disk */
public void setToken(Token token)
{
@@ -434,9 +418,15 @@
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state normal, token " + token);
if (isClientMode)
- updateForeignTokenUnsafe(token, endpoint);
+ {
+ tokenMetadata_.update(token, endpoint);
+ // do NOT update systemtable in client mode
+ }
else
- updateForeignToken(token, endpoint);
+ {
+ tokenMetadata_.update(token, endpoint);
+ SystemTable.updateToken(endpoint, token);
+ }
replicationStrategy_.removeObsoletePendingRanges();
}
else if (STATE_LEAVING.equals(stateName))