You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [27/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Thu Jul 30 15:30:21 2009
@@ -1,89 +1,89 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
-
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface StorageServiceMBean
-{    
-    public String getLiveNodes();
-    public String getUnreachableNodes();
-    public String getToken();
-    public Map<Range, List<EndPoint>> getRangeToEndPointMap();
-    public String getLoadInfo();
-    public int getCurrentGenerationNumber();
-    public void forceTableCompaction() throws IOException;
-    
-    /**
-     * This method will cause the local node initiate
-     * the bootstrap process for all the nodes specified
-     * in the string parameter passed in. This local node
-     * will calculate who gives what ranges to the nodes
-     * and then instructs the nodes to do so.
-     * 
-     * @param nodes colon delimited list of endpoints that need
-     *              to be bootstrapped
-    */
-    public void loadAll(String nodes);
-    
-    /**
-     * 
-     */
-    public void forceTableCleanup() throws IOException;
-
-    /**
-     * Stream the files in the bootstrap directory over to the
-     * node being bootstrapped. This is used in case of normal
-     * bootstrap failure. Use a tool to re-calculate the cardinality
-     * at a later point at the destination.
-     * @param directories colon separated list of directories from where 
-     *                files need to be picked up.
-     * @param target endpoint receiving data.
-    */
-    public void forceHandoff(List<String> directories, String target) throws IOException;
-
-    /**
-     * Takes the snapshot for a given table.
-     * 
-     * @param tableName the name of the table.
-     * @param tag       the tag given to the snapshot (null is permissible)
-     */
-    public void takeSnapshot(String tableName, String tag) throws IOException;
-
-    /**
-     * Takes a snapshot for every table.
-     * 
-     * @param tag the tag given to the snapshot (null is permissible)
-     */
-    public void takeAllSnapshot(String tag) throws IOException;
-
-    /**
-     * Remove all the existing snapshots.
-     */
-    public void clearSnapshot() throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface StorageServiceMBean
+{    
+    public String getLiveNodes();
+    public String getUnreachableNodes();
+    public String getToken();
+    public Map<Range, List<EndPoint>> getRangeToEndPointMap();
+    public String getLoadInfo();
+    public int getCurrentGenerationNumber();
+    public void forceTableCompaction() throws IOException;
+    
+    /**
+     * This method will cause the local node initiate
+     * the bootstrap process for all the nodes specified
+     * in the string parameter passed in. This local node
+     * will calculate who gives what ranges to the nodes
+     * and then instructs the nodes to do so.
+     * 
+     * @param nodes colon delimited list of endpoints that need
+     *              to be bootstrapped
+    */
+    public void loadAll(String nodes);
+    
+    /**
+     * 
+     */
+    public void forceTableCleanup() throws IOException;
+
+    /**
+     * Stream the files in the bootstrap directory over to the
+     * node being bootstrapped. This is used in case of normal
+     * bootstrap failure. Use a tool to re-calculate the cardinality
+     * at a later point at the destination.
+     * @param directories colon separated list of directories from where 
+     *                files need to be picked up.
+     * @param target endpoint receiving data.
+    */
+    public void forceHandoff(List<String> directories, String target) throws IOException;
+
+    /**
+     * Takes the snapshot for a given table.
+     * 
+     * @param tableName the name of the table.
+     * @param tag       the tag given to the snapshot (null is permissible)
+     */
+    public void takeSnapshot(String tableName, String tag) throws IOException;
+
+    /**
+     * Takes a snapshot for every table.
+     * 
+     * @param tag the tag given to the snapshot (null is permissible)
+     */
+    public void takeAllSnapshot(String tag) throws IOException;
+
+    /**
+     * Remove all the existing snapshots.
+     */
+    public void clearSnapshot() throws IOException;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Thu Jul 30 15:30:21 2009
@@ -1,164 +1,164 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService.BootstrapInitiateDoneVerbHandler;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/*
- * This class manages the streaming of multiple files 
- * one after the other. 
-*/
-public final class StreamManager
-{   
-    private static Logger logger_ = Logger.getLogger( StreamManager.class );
-    
-    public static class BootstrapTerminateVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( BootstrapInitiateDoneVerbHandler.class );
-
-        public void doVerb(Message message)
-        {
-            byte[] body = message.getMessageBody();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-
-            try
-            {
-                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
-                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
-                                               
-                switch( streamStatus.getAction() )
-                {
-                    case DELETE:                              
-                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
-                        break;
-
-                    case STREAM:
-                        if (logger_.isDebugEnabled())
-                          logger_.debug("Need to re-stream file " + streamStatus.getFile());
-                        StreamManager.instance(message.getFrom()).repeat();
-                        break;
-
-                    default:
-                        break;
-                }
-            }
-            catch ( IOException ex )
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }
-        }
-    }
-    
-    private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
-    
-    public static StreamManager instance(EndPoint to)
-    {
-        StreamManager streamManager = streamManagers_.get(to);
-        if ( streamManager == null )
-        {
-            streamManager = new StreamManager(to);
-            streamManagers_.put(to, streamManager);
-        }
-        return streamManager;
-    }
-    
-    private List<File> filesToStream_ = new ArrayList<File>();
-    private EndPoint to_;
-    private long totalBytesToStream_ = 0L;
-    
-    private StreamManager(EndPoint to)
-    {
-        to_ = to;
-    }
-    
-    public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts)
-    {
-        for ( StreamContextManager.StreamContext streamContext : streamContexts )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
-            filesToStream_.add( new File( streamContext.getTargetFile() ) );
-            totalBytesToStream_ += streamContext.getExpectedBytes();
-        }
-    }
-    
-    void start()
-    {
-        if ( filesToStream_.size() > 0 )
-        {
-            File file = filesToStream_.get(0);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Streaming file " + file + " ...");
-            MessagingService.getMessagingInstance().stream(file.getAbsolutePath(), 0L, file.length(), StorageService.getLocalStorageEndPoint(), to_);
-        }
-    }
-    
-    void repeat()
-    {
-        if ( filesToStream_.size() > 0 )
-            start();
-    }
-    
-    void finish(String file) throws IOException
-    {
-        File f = new File(file);
-        if (logger_.isDebugEnabled())
-          logger_.debug("Deleting file " + file + " after streaming " + f.length() + "/" + totalBytesToStream_ + " bytes.");
-        FileUtils.delete(file);
-        filesToStream_.remove(0);
-        if ( filesToStream_.size() > 0 )
-            start();
-        else
-        {
-            synchronized(this)
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Signalling that streaming is done for " + to_);
-                notifyAll();
-            }
-        }
-    }
-    
-    public synchronized void waitForStreamCompletion()
-    {
-        try
-        {
-            wait();
-        }
-        catch(InterruptedException ex)
-        {
-            logger_.warn(LogUtil.throwableToString(ex));
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService.BootstrapInitiateDoneVerbHandler;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/*
+ * This class manages the streaming of multiple files 
+ * one after the other. 
+*/
+public final class StreamManager
+{   
+    private static Logger logger_ = Logger.getLogger( StreamManager.class );
+    
+    public static class BootstrapTerminateVerbHandler implements IVerbHandler
+    {
+        private static Logger logger_ = Logger.getLogger( BootstrapInitiateDoneVerbHandler.class );
+
+        public void doVerb(Message message)
+        {
+            byte[] body = message.getMessageBody();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+
+            try
+            {
+                StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+                StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+                                               
+                switch( streamStatus.getAction() )
+                {
+                    case DELETE:                              
+                        StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+                        break;
+
+                    case STREAM:
+                        if (logger_.isDebugEnabled())
+                          logger_.debug("Need to re-stream file " + streamStatus.getFile());
+                        StreamManager.instance(message.getFrom()).repeat();
+                        break;
+
+                    default:
+                        break;
+                }
+            }
+            catch ( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+        }
+    }
+    
+    private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
+    
+    public static StreamManager instance(EndPoint to)
+    {
+        StreamManager streamManager = streamManagers_.get(to);
+        if ( streamManager == null )
+        {
+            streamManager = new StreamManager(to);
+            streamManagers_.put(to, streamManager);
+        }
+        return streamManager;
+    }
+    
+    private List<File> filesToStream_ = new ArrayList<File>();
+    private EndPoint to_;
+    private long totalBytesToStream_ = 0L;
+    
+    private StreamManager(EndPoint to)
+    {
+        to_ = to;
+    }
+    
+    public void addFilesToStream(StreamContextManager.StreamContext[] streamContexts)
+    {
+        for ( StreamContextManager.StreamContext streamContext : streamContexts )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Adding file " + streamContext.getTargetFile() + " to be streamed.");
+            filesToStream_.add( new File( streamContext.getTargetFile() ) );
+            totalBytesToStream_ += streamContext.getExpectedBytes();
+        }
+    }
+    
+    void start()
+    {
+        if ( filesToStream_.size() > 0 )
+        {
+            File file = filesToStream_.get(0);
+            if (logger_.isDebugEnabled())
+              logger_.debug("Streaming file " + file + " ...");
+            MessagingService.getMessagingInstance().stream(file.getAbsolutePath(), 0L, file.length(), StorageService.getLocalStorageEndPoint(), to_);
+        }
+    }
+    
+    void repeat()
+    {
+        if ( filesToStream_.size() > 0 )
+            start();
+    }
+    
+    void finish(String file) throws IOException
+    {
+        File f = new File(file);
+        if (logger_.isDebugEnabled())
+          logger_.debug("Deleting file " + file + " after streaming " + f.length() + "/" + totalBytesToStream_ + " bytes.");
+        FileUtils.delete(file);
+        filesToStream_.remove(0);
+        if ( filesToStream_.size() > 0 )
+            start();
+        else
+        {
+            synchronized(this)
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Signalling that streaming is done for " + to_);
+                notifyAll();
+            }
+        }
+    }
+    
+    public synchronized void waitForStreamCompletion()
+    {
+        try
+        {
+            wait();
+        }
+        catch(InterruptedException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,54 +1,54 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TokenUpdateVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(TokenUpdateVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-    	byte[] body = message.getMessageBody();
-        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
-        try
-        {
-        	logger_.info("Updating the token to [" + token + "]");
-        	StorageService.instance().updateToken(token);
-        }
-    	catch( IOException ex )
-    	{
-    		if (logger_.isDebugEnabled())
-    		  logger_.debug(LogUtil.throwableToString(ex));
-    	}
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenUpdateVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(TokenUpdateVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+    	byte[] body = message.getMessageBody();
+        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
+        try
+        {
+        	logger_.info("Updating the token to [" + token + "]");
+        	StorageService.instance().updateToken(token);
+        }
+    	catch( IOException ex )
+    	{
+    		if (logger_.isDebugEnabled())
+    		  logger_.debug(LogUtil.throwableToString(ex));
+    	}
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java Thu Jul 30 15:30:21 2009
@@ -1,77 +1,77 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.db.WriteResponse;
-import org.apache.cassandra.net.Message;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class WriteResponseResolver implements IResponseResolver<Boolean> {
-
-	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
-
-	/*
-	 * The resolve function for the Write looks at all the responses if all the
-	 * responses returned are false then we have a problem since that means the
-	 * key was not written to any of the servers we want to notify the client of
-	 * this so in that case we should return a false saying that the write
-	 * failed.
-	 * 
-	 */
-	public Boolean resolve(List<Message> responses) throws DigestMismatchException 
-	{
-		// TODO: We need to log error responses here for example
-		// if a write fails for a key log that the key could not be replicated
-		boolean returnValue = false;
-		for (Message response : responses) {
-            WriteResponse writeResponseMessage = null;
-            try
-            {
-                writeResponseMessage = WriteResponse.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(response.getMessageBody())));
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            boolean result = writeResponseMessage.isSuccess();
-            if (!result) {
-				if (logger_.isDebugEnabled())
-                    logger_.debug("Write at " + response.getFrom()
-						+ " may have failed for the key " + writeResponseMessage.key());
-			}
-			returnValue |= result;
-		}
-		return returnValue;
-	}
-
-	public boolean isDataPresent(List<Message> responses)
-	{
-		return true;
-	}
-	
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.net.Message;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class WriteResponseResolver implements IResponseResolver<Boolean> {
+
+	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+
+	/*
+	 * The resolve function for the Write looks at all the responses if all the
+	 * responses returned are false then we have a problem since that means the
+	 * key was not written to any of the servers we want to notify the client of
+	 * this so in that case we should return a false saying that the write
+	 * failed.
+	 * 
+	 */
+	public Boolean resolve(List<Message> responses) throws DigestMismatchException 
+	{
+		// TODO: We need to log error responses here for example
+		// if a write fails for a key log that the key could not be replicated
+		boolean returnValue = false;
+		for (Message response : responses) {
+            WriteResponse writeResponseMessage = null;
+            try
+            {
+                writeResponseMessage = WriteResponse.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(response.getMessageBody())));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            boolean result = writeResponseMessage.isSuccess();
+            if (!result) {
+				if (logger_.isDebugEnabled())
+                    logger_.debug("Write at " + response.getFrom()
+						+ " may have failed for the key " + writeResponseMessage.key());
+			}
+			returnValue |= result;
+		}
+		return returnValue;
+	}
+
+	public boolean isDataPresent(List<Message> responses)
+	{
+		return true;
+	}
+	
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java Thu Jul 30 15:30:21 2009
@@ -1,99 +1,99 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.RandomAccessFile;
-
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LogUtil;
-
-
-public class KeyChecker
-{
-    private static final int bufSize_ = 128*1024*1024;
-    /*
-     * This function checks if the local storage endpoint 
-     * is responsible for storing this key .
-     */
-    private static boolean checkIfProcessKey(String key)
-    {
-        EndPoint[] endPoints = StorageService.instance().getNStorageEndPoint(key);
-        EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
-        for(EndPoint endPoint : endPoints)
-        {
-            if(endPoint.equals(localEndPoint))
-                return true;
-        }
-        return false;
-    }
-    
-    public static void main(String[] args) throws Throwable
-    {
-        if ( args.length != 1 )
-        {
-            System.out.println("Usage : java com.facebook.infrastructure.tools.KeyChecker <file containing all keys>");
-            System.exit(1);
-        }
-        
-        LogUtil.init();
-        StorageService s = StorageService.instance();
-        s.start();
-        
-        /* Sleep for proper discovery */
-        Thread.sleep(240000);
-        /* Create the file for the missing keys */
-        RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostAddress() + ".dat", "rw");
-        
-        /* Start reading the file that contains the keys */
-        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );
-        String key = null;
-        boolean bStarted = false;
-        
-        while ( ( key = bufReader.readLine() ) != null )
-        {            
-            if ( !bStarted )
-            {
-                bStarted = true;
-                System.out.println("Started the processing of the file ...");
-            }
-            
-            key = key.trim();
-            if ( StorageService.instance().isPrimary(key) )
-            {
-                System.out.println("Processing key " + key);
-                Row row = Table.open("Mailbox").getRow(key, "MailboxMailList0");
-                if ( row.isEmpty() )
-                {
-                    System.out.println("MISSING KEY : " + key);
-                    raf.write(key.getBytes());
-                    raf.write(System.getProperty("line.separator").getBytes());
-                }
-            }
-        }
-        System.out.println("DONE checking keys ...");
-        raf.close();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+
+
+public class KeyChecker
+{
+    private static final int bufSize_ = 128*1024*1024;
+    /*
+     * This function checks if the local storage endpoint 
+     * is responsible for storing this key .
+     */
+    private static boolean checkIfProcessKey(String key)
+    {
+        EndPoint[] endPoints = StorageService.instance().getNStorageEndPoint(key);
+        EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
+        for(EndPoint endPoint : endPoints)
+        {
+            if(endPoint.equals(localEndPoint))
+                return true;
+        }
+        return false;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 1 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.KeyChecker <file containing all keys>");
+            System.exit(1);
+        }
+        
+        LogUtil.init();
+        StorageService s = StorageService.instance();
+        s.start();
+        
+        /* Sleep for proper discovery */
+        Thread.sleep(240000);
+        /* Create the file for the missing keys */
+        RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostAddress() + ".dat", "rw");
+        
+        /* Start reading the file that contains the keys */
+        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );
+        String key = null;
+        boolean bStarted = false;
+        
+        while ( ( key = bufReader.readLine() ) != null )
+        {            
+            if ( !bStarted )
+            {
+                bStarted = true;
+                System.out.println("Started the processing of the file ...");
+            }
+            
+            key = key.trim();
+            if ( StorageService.instance().isPrimary(key) )
+            {
+                System.out.println("Processing key " + key);
+                Row row = Table.open("Mailbox").getRow(key, "MailboxMailList0");
+                if ( row.isEmpty() )
+                {
+                    System.out.println("MISSING KEY : " + key);
+                    raf.write(key.getBytes());
+                    raf.write(System.getProperty("line.separator").getBytes());
+                }
+            }
+        }
+        System.out.println("DONE checking keys ...");
+        raf.close();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java Thu Jul 30 15:30:21 2009
@@ -1,122 +1,122 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class MembershipCleaner
-{
-    private static final int port_ = 7000;
-    private static final long waitTime_ = 10000;
-    
-    public static void main(String[] args) throws Throwable
-    {
-        if ( args.length != 3 )
-        {
-            System.out.println("Usage : java com.facebook.infrastructure.tools.MembershipCleaner " +
-                    "<ip:port to send the message> " +
-                    "<node which needs to be removed> " +
-                    "<file containing all nodes in the cluster>");
-            System.exit(1);
-        }
-        
-        String ipPort = args[0];
-        String node = args[1];
-        String file = args[2];
-        
-        String[] ipPortPair = ipPort.split(":");
-        EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
-        MembershipCleanerMessage mcMessage = new MembershipCleanerMessage(node);
-        
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        MembershipCleanerMessage.serializer().serialize(mcMessage, dos);
-        /* Construct the token update message to be sent */
-        Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, bos.toByteArray() );
-        
-        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
-        String line = null;
-       
-        while ( ( line = bufReader.readLine() ) != null )
-        {            
-            mbrshipCleanerMessage.addHeader(line, line.getBytes());
-        }
-        
-        System.out.println("Sending a membership clean message to " + target);
-        MessagingService.getMessagingInstance().sendOneWay(mbrshipCleanerMessage, target);
-        Thread.sleep(MembershipCleaner.waitTime_);
-        System.out.println("Done sending the update message");
-    }
-    
-    public static class MembershipCleanerMessage implements Serializable
-    {
-        private static ICompactSerializer<MembershipCleanerMessage> serializer_;
-        private static AtomicInteger idGen_ = new AtomicInteger(0);
-        
-        static
-        {
-            serializer_ = new MembershipCleanerMessageSerializer();            
-        }
-        
-        static ICompactSerializer<MembershipCleanerMessage> serializer()
-        {
-            return serializer_;
-        }
-
-        private String target_;
-        
-        MembershipCleanerMessage(String target)
-        {
-            target_ = target;        
-        }
-        
-        String getTarget()
-        {
-            return target_;
-        }
-    }
-    
-    public static class MembershipCleanerMessageSerializer implements ICompactSerializer<MembershipCleanerMessage>
-    {
-        public void serialize(MembershipCleanerMessage mcMessage, DataOutputStream dos) throws IOException
-        {            
-            dos.writeUTF(mcMessage.getTarget() );                      
-        }
-        
-        public MembershipCleanerMessage deserialize(DataInputStream dis) throws IOException
-        {            
-            return new MembershipCleanerMessage(dis.readUTF());
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class MembershipCleaner
+{
+    private static final int port_ = 7000;
+    private static final long waitTime_ = 10000;
+    
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 3 )
+        {
+            System.out.println("Usage : java com.facebook.infrastructure.tools.MembershipCleaner " +
+                    "<ip:port to send the message> " +
+                    "<node which needs to be removed> " +
+                    "<file containing all nodes in the cluster>");
+            System.exit(1);
+        }
+        
+        String ipPort = args[0];
+        String node = args[1];
+        String file = args[2];
+        
+        String[] ipPortPair = ipPort.split(":");
+        EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
+        MembershipCleanerMessage mcMessage = new MembershipCleanerMessage(node);
+        
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        MembershipCleanerMessage.serializer().serialize(mcMessage, dos);
+        /* Construct the token update message to be sent */
+        Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, bos.toByteArray() );
+        
+        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
+        String line = null;
+       
+        while ( ( line = bufReader.readLine() ) != null )
+        {            
+            mbrshipCleanerMessage.addHeader(line, line.getBytes());
+        }
+        
+        System.out.println("Sending a membership clean message to " + target);
+        MessagingService.getMessagingInstance().sendOneWay(mbrshipCleanerMessage, target);
+        Thread.sleep(MembershipCleaner.waitTime_);
+        System.out.println("Done sending the update message");
+    }
+    
+    public static class MembershipCleanerMessage implements Serializable
+    {
+        private static ICompactSerializer<MembershipCleanerMessage> serializer_;
+        private static AtomicInteger idGen_ = new AtomicInteger(0);
+        
+        static
+        {
+            serializer_ = new MembershipCleanerMessageSerializer();            
+        }
+        
+        static ICompactSerializer<MembershipCleanerMessage> serializer()
+        {
+            return serializer_;
+        }
+
+        private String target_;
+        
+        MembershipCleanerMessage(String target)
+        {
+            target_ = target;        
+        }
+        
+        String getTarget()
+        {
+            return target_;
+        }
+    }
+    
+    public static class MembershipCleanerMessageSerializer implements ICompactSerializer<MembershipCleanerMessage>
+    {
+        public void serialize(MembershipCleanerMessage mcMessage, DataOutputStream dos) throws IOException
+        {            
+            dos.writeUTF(mcMessage.getTarget() );                      
+        }
+        
+        public MembershipCleanerMessage deserialize(DataInputStream dis) throws IOException
+        {            
+            return new MembershipCleanerMessage(dis.readUTF());
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,89 +1,89 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MembershipCleanerVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(MembershipCleanerVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-        byte[] body = message.getMessageBody();
-        
-        try
-        {
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            /* Deserialize to get the token for this endpoint. */
-            MembershipCleaner.MembershipCleanerMessage mcMessage = MembershipCleaner.MembershipCleanerMessage.serializer().deserialize(bufIn);
-            
-            String target = mcMessage.getTarget();
-            logger_.info("Removing the node [" + target + "] from membership");
-            EndPoint targetEndPoint = new EndPoint(target, DatabaseDescriptor.getControlPort());
-            /* Remove the token related information for this endpoint */
-            StorageService.instance().removeTokenState(targetEndPoint);
-            
-            /* Get the headers for this message */
-            Map<String, byte[]> headers = message.getHeaders();
-            headers.remove( StorageService.getLocalStorageEndPoint().getHost() );
-            if (logger_.isDebugEnabled())
-              logger_.debug("Number of nodes in the header " + headers.size());
-            Set<String> nodes = headers.keySet();
-            
-            for ( String node : nodes )
-            {            
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Processing node " + node);
-                byte[] bytes = headers.remove(node);
-                /* Send a message to this node to alter its membership state. */
-                EndPoint targetNode = new EndPoint(node, DatabaseDescriptor.getStoragePort());                
-                
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Sending a membership clean message to " + targetNode);
-                MessagingService.getMessagingInstance().sendOneWay(message, targetNode);
-                break;
-            }                        
-        }
-        catch( IOException ex )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug(LogUtil.throwableToString(ex));
-        }
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MembershipCleanerVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(MembershipCleanerVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        
+        try
+        {
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            /* Deserialize to get the token for this endpoint. */
+            MembershipCleaner.MembershipCleanerMessage mcMessage = MembershipCleaner.MembershipCleanerMessage.serializer().deserialize(bufIn);
+            
+            String target = mcMessage.getTarget();
+            logger_.info("Removing the node [" + target + "] from membership");
+            EndPoint targetEndPoint = new EndPoint(target, DatabaseDescriptor.getControlPort());
+            /* Remove the token related information for this endpoint */
+            StorageService.instance().removeTokenState(targetEndPoint);
+            
+            /* Get the headers for this message */
+            Map<String, byte[]> headers = message.getHeaders();
+            headers.remove( StorageService.getLocalStorageEndPoint().getHost() );
+            if (logger_.isDebugEnabled())
+              logger_.debug("Number of nodes in the header " + headers.size());
+            Set<String> nodes = headers.keySet();
+            
+            for ( String node : nodes )
+            {            
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Processing node " + node);
+                byte[] bytes = headers.remove(node);
+                /* Send a message to this node to alter its membership state. */
+                EndPoint targetNode = new EndPoint(node, DatabaseDescriptor.getStoragePort());                
+                
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Sending a membership clean message to " + targetNode);
+                MessagingService.getMessagingInstance().sendOneWay(message, targetNode);
+                break;
+            }                        
+        }
+        catch( IOException ex )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug(LogUtil.throwableToString(ex));
+        }
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ThreadListBuilder.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ThreadListBuilder.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ThreadListBuilder.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/ThreadListBuilder.java Thu Jul 30 15:30:21 2009
@@ -1,94 +1,94 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.utils.BloomFilter;
-
-
-public class ThreadListBuilder
-{
-    private final static int bufSize_ = 64*1024*1024;
-    private final static int count_ = 128*1024*1024;
-    
-    public static void main(String[] args) throws Throwable
-    {
-        if ( args.length != 2 )
-        {
-            System.out.println("Usage : java org.apache.cassandra.tools.ThreadListBuilder <directory containing files to be processed> <directory to dump the bloom filter in.>");
-            System.exit(1);
-        }
-        
-        File directory = new File(args[0]);
-        File[] files = directory.listFiles();
-        List<DataOutputBuffer> buffers = new ArrayList<DataOutputBuffer>();    
-        BloomFilter bf = new BloomFilter(count_, 8);        
-        int keyCount = 0;
-        
-        /* Process the list of files. */
-        for ( File file : files )
-        {
-            System.out.println("Processing file " + file);
-            BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ), ThreadListBuilder.bufSize_ );
-            String line = null;
-            
-            while ( (line = bufReader.readLine()) != null )
-            {
-                /* After accumulating count_ keys reset the bloom filter. */
-                if ( keyCount > 0 && keyCount % count_ == 0 )
-                {                       
-                    DataOutputBuffer bufOut = new DataOutputBuffer();
-                    BloomFilter.serializer().serialize(bf, bufOut);
-                    System.out.println("Finished serializing the bloom filter");
-                    buffers.add(bufOut);
-                    bf = new BloomFilter(count_, 8);
-                }
-                line = line.trim();                
-                bf.add(line);
-                ++keyCount;
-            }
-        }
-        
-        /* Add the bloom filter assuming the last one was left out */
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        BloomFilter.serializer().serialize(bf, bufOut);
-        buffers.add(bufOut);
-        
-        
-        int size = buffers.size();
-        for ( int i = 0; i < size; ++i )
-        {
-            DataOutputBuffer buffer = buffers.get(i);
-            String file = args[1] + File.separator + "Bloom-Filter-" + i + ".dat";
-            RandomAccessFile raf = new RandomAccessFile(file, "rw");
-            raf.write(buffer.getData(), 0, buffer.getLength());
-            raf.close();
-            buffer.close();
-        }
-        System.out.println("Done writing the bloom filter to disk");
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.utils.BloomFilter;
+
+
+public class ThreadListBuilder
+{
+    private final static int bufSize_ = 64*1024*1024;
+    private final static int count_ = 128*1024*1024;
+    
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 2 )
+        {
+            System.out.println("Usage : java org.apache.cassandra.tools.ThreadListBuilder <directory containing files to be processed> <directory to dump the bloom filter in.>");
+            System.exit(1);
+        }
+        
+        File directory = new File(args[0]);
+        File[] files = directory.listFiles();
+        List<DataOutputBuffer> buffers = new ArrayList<DataOutputBuffer>();    
+        BloomFilter bf = new BloomFilter(count_, 8);        
+        int keyCount = 0;
+        
+        /* Process the list of files. */
+        for ( File file : files )
+        {
+            System.out.println("Processing file " + file);
+            BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ), ThreadListBuilder.bufSize_ );
+            String line = null;
+            
+            while ( (line = bufReader.readLine()) != null )
+            {
+                /* After accumulating count_ keys reset the bloom filter. */
+                if ( keyCount > 0 && keyCount % count_ == 0 )
+                {                       
+                    DataOutputBuffer bufOut = new DataOutputBuffer();
+                    BloomFilter.serializer().serialize(bf, bufOut);
+                    System.out.println("Finished serializing the bloom filter");
+                    buffers.add(bufOut);
+                    bf = new BloomFilter(count_, 8);
+                }
+                line = line.trim();                
+                bf.add(line);
+                ++keyCount;
+            }
+        }
+        
+        /* Add the bloom filter assuming the last one was left out */
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        BloomFilter.serializer().serialize(bf, bufOut);
+        buffers.add(bufOut);
+        
+        
+        int size = buffers.size();
+        for ( int i = 0; i < size; ++i )
+        {
+            DataOutputBuffer buffer = buffers.get(i);
+            String file = args[1] + File.separator + "Bloom-Filter-" + i + ".dat";
+            RandomAccessFile raf = new RandomAccessFile(file, "rw");
+            raf.write(buffer.getData(), 0, buffer.getLength());
+            raf.close();
+            buffer.close();
+        }
+        System.out.println("Done writing the bloom filter to disk");
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,98 +1,98 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TokenUpdateVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(TokenUpdateVerbHandler.class);
-
-    public void doVerb(Message message)
-    {
-    	byte[] body = message.getMessageBody();
-        
-        try
-        {
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            /* Deserialize to get the token for this endpoint. */
-            Token token = Token.serializer().deserialize(bufIn);
-
-            logger_.info("Updating the token to [" + token + "]");
-            StorageService.instance().updateToken(token);
-            
-            /* Get the headers for this message */
-            Map<String, byte[]> headers = message.getHeaders();
-            headers.remove( StorageService.getLocalStorageEndPoint().getHost() );
-            if (logger_.isDebugEnabled())
-              logger_.debug("Number of nodes in the header " + headers.size());
-            Set<String> nodes = headers.keySet();
-            
-            IPartitioner p = StorageService.getPartitioner();
-            for ( String node : nodes )
-            {            
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Processing node " + node);
-                byte[] bytes = headers.remove(node);
-                /* Send a message to this node to update its token to the one retrieved. */
-                EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
-                token = p.getTokenFactory().fromByteArray(bytes);
-                
-                /* Reset the new Message */
-                ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                DataOutputStream dos = new DataOutputStream(bos);
-                Token.serializer().serialize(token, dos);
-                message.setMessageBody(bos.toByteArray());
-                
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Sending a token update message to " + target + " to update it to " + token);
-                MessagingService.getMessagingInstance().sendOneWay(message, target);
-                break;
-            }                        
-        }
-    	catch( IOException ex )
-    	{
-    		if (logger_.isDebugEnabled())
-    		  logger_.debug(LogUtil.throwableToString(ex));
-    	}
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenUpdateVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(TokenUpdateVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+    	byte[] body = message.getMessageBody();
+        
+        try
+        {
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            /* Deserialize to get the token for this endpoint. */
+            Token token = Token.serializer().deserialize(bufIn);
+
+            logger_.info("Updating the token to [" + token + "]");
+            StorageService.instance().updateToken(token);
+            
+            /* Get the headers for this message */
+            Map<String, byte[]> headers = message.getHeaders();
+            headers.remove( StorageService.getLocalStorageEndPoint().getHost() );
+            if (logger_.isDebugEnabled())
+              logger_.debug("Number of nodes in the header " + headers.size());
+            Set<String> nodes = headers.keySet();
+            
+            IPartitioner p = StorageService.getPartitioner();
+            for ( String node : nodes )
+            {            
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Processing node " + node);
+                byte[] bytes = headers.remove(node);
+                /* Send a message to this node to update its token to the one retrieved. */
+                EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
+                token = p.getTokenFactory().fromByteArray(bytes);
+                
+                /* Reset the new Message */
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos);
+                Token.serializer().serialize(token, dos);
+                message.setMessageBody(bos.toByteArray());
+                
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Sending a token update message to " + target + " to update it to " + token);
+                MessagingService.getMessagingInstance().sendOneWay(message, target);
+                break;
+            }                        
+        }
+    	catch( IOException ex )
+    	{
+    		if (logger_.isDebugEnabled())
+    		  logger_.debug(LogUtil.throwableToString(ex));
+    	}
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Thu Jul 30 15:30:21 2009
@@ -1,80 +1,80 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tools;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class TokenUpdater
-{
-    private static final int port_ = 7000;
-    private static final long waitTime_ = 10000;
-    
-    public static void main(String[] args) throws Throwable
-    {
-        if ( args.length != 3 )
-        {
-            System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token> <file containing node token info>");
-            System.exit(1);
-        }
-        
-        String ipPort = args[0];
-        IPartitioner p = StorageService.getPartitioner();
-        Token token = p.getTokenFactory().fromString(args[1]);
-        String file = args[2];
-        
-        String[] ipPortPair = ipPort.split(":");
-        EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
-
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        Token.serializer().serialize(token, dos);
-
-        /* Construct the token update message to be sent */
-        Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
-        
-        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
-        String line = null;
-       
-        while ( ( line = bufReader.readLine() ) != null )
-        {
-            String[] nodeTokenPair = line.split(" ");
-            /* Add the node and the token pair into the header of this message. */
-            Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
-            tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
-        }
-        
-        System.out.println("Sending a token update message to " + target);
-        MessagingService.getMessagingInstance().sendOneWay(tokenUpdateMessage, target);
-        Thread.sleep(TokenUpdater.waitTime_);
-        System.out.println("Done sending the update message");
-    }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class TokenUpdater
+{
+    private static final int port_ = 7000;
+    private static final long waitTime_ = 10000;
+    
+    public static void main(String[] args) throws Throwable
+    {
+        if ( args.length != 3 )
+        {
+            System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token> <file containing node token info>");
+            System.exit(1);
+        }
+        
+        String ipPort = args[0];
+        IPartitioner p = StorageService.getPartitioner();
+        Token token = p.getTokenFactory().fromString(args[1]);
+        String file = args[2];
+        
+        String[] ipPortPair = ipPort.split(":");
+        EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        Token.serializer().serialize(token, dos);
+
+        /* Construct the token update message to be sent */
+        Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
+        
+        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
+        String line = null;
+       
+        while ( ( line = bufReader.readLine() ) != null )
+        {
+            String[] nodeTokenPair = line.split(" ");
+            /* Add the node and the token pair into the header of this message. */
+            Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
+            tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
+        }
+        
+        System.out.println("Sending a token update message to " + target);
+        MessagingService.getMessagingInstance().sendOneWay(tokenUpdateMessage, target);
+        Thread.sleep(TokenUpdater.waitTime_);
+        System.out.println("Done sending the update message");
+    }
+
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java Thu Jul 30 15:30:21 2009
@@ -1,66 +1,66 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import java.nio.ByteBuffer;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-
-public class BasicUtilities
-{        
-	public static byte[] longToByteArray(long arg)
-	{      
-        byte[] retVal = new byte[8];
-        ByteBuffer.wrap(retVal).putLong(arg);
-        return retVal; 
-	 }
-	
-	public static long byteArrayToLong(byte[] arg)
-	{
-        return ByteBuffer.wrap(arg).getLong();
-	}
-	
-	public static byte[] intToByteArray(int arg)
-	{      
-        byte[] retVal = new byte[4];
-        ByteBuffer.wrap(retVal).putInt(arg);
-        return retVal; 
-	 }
-	
-	public static int byteArrayToInt(byte[] arg)
-	{
-        return ByteBuffer.wrap(arg).getInt();
-	}
-	
-	public static byte[] shortToByteArray(short arg)
-	{      
-        byte[] retVal = new byte[2];
-        ByteBuffer bb= ByteBuffer.wrap(retVal);
-        bb.putShort(arg);
-        return retVal; 
-	 }
-	
-	public static short byteArrayToShort(byte[] arg)
-	{
-        return ByteBuffer.wrap(arg).getShort();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+public class BasicUtilities
+{        
+	public static byte[] longToByteArray(long arg)
+	{      
+        byte[] retVal = new byte[8];
+        ByteBuffer.wrap(retVal).putLong(arg);
+        return retVal; 
+	 }
+	
+	public static long byteArrayToLong(byte[] arg)
+	{
+        return ByteBuffer.wrap(arg).getLong();
+	}
+	
+	public static byte[] intToByteArray(int arg)
+	{      
+        byte[] retVal = new byte[4];
+        ByteBuffer.wrap(retVal).putInt(arg);
+        return retVal; 
+	 }
+	
+	public static int byteArrayToInt(byte[] arg)
+	{
+        return ByteBuffer.wrap(arg).getInt();
+	}
+	
+	public static byte[] shortToByteArray(short arg)
+	{      
+        byte[] retVal = new byte[2];
+        ByteBuffer bb= ByteBuffer.wrap(retVal);
+        bb.putShort(arg);
+        return retVal; 
+	 }
+	
+	public static short byteArrayToShort(byte[] arg)
+	{
+        return ByteBuffer.wrap(arg).getShort();
+    }
+}