You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/15 17:47:37 UTC

svn commit: r815373 - in /incubator/cassandra/trunk: bin/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/

Author: jbellis
Date: Tue Sep 15 15:47:36 2009
New Revision: 815373

URL: http://svn.apache.org/viewvc?rev=815373&view=rev
Log:
Fix deserialization bug in TokenUpdateVerbHandler; change TokenUpdater to update one node at a time, and add bin/tokenupdater as a convenience.  Patch by Sammy Yu; reviewed for CASSANDRA-363 by jbellis

Added:
    incubator/cassandra/trunk/bin/tokenupdater
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java

Added: incubator/cassandra/trunk/bin/tokenupdater
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/bin/tokenupdater?rev=815373&view=auto
==============================================================================
--- incubator/cassandra/trunk/bin/tokenupdater (added)
+++ incubator/cassandra/trunk/bin/tokenupdater Tue Sep 15 15:47:36 2009
@@ -0,0 +1,49 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   `dirname $0`/cassandra.in.sh; do
+        if [ -r $include ]; then
+            . $include
+            break
+        fi
+    done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+    . $CASSANDRA_INCLUDE
+fi
+
+if [ -z $CASSANDRA_CONF -o -z $CLASSPATH ]; then
+    echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+    exit 1
+fi
+
+# Special-case path variables.
+case "`uname`" in
+    CYGWIN*) 
+        CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+        CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"`
+    ;;
+esac
+
+java -cp $CLASSPATH -Dstorage-config=$CASSANDRA_CONF \
+        org.apache.cassandra.tools.TokenUpdater $@
+
+# vi:ai sw=4 ts=4 tw=0 et

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Sep 15 15:47:36 2009
@@ -514,6 +514,8 @@
             messageDeserializerExecutor_.shutdownNow();
             streamExecutor_.shutdownNow();
 
+            StageManager.shutdown();
+            
             /* shut down the cachetables */
             taskCompletionMap_.shutdown();
             callbackMap_.shutdown();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java Tue Sep 15 15:47:36 2009
@@ -23,9 +23,9 @@
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
 
 public class TokenUpdateVerbHandler implements IVerbHandler
 {
@@ -33,18 +33,20 @@
 
     public void doVerb(Message message)
     {
-    	byte[] body = message.getMessageBody();
-        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
+        byte[] body = message.getMessageBody();
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
         try
         {
-        	logger_.info("Updating the token to [" + token + "]");
-        	StorageService.instance().updateToken(token);
+            /* Deserialize to get the token for this endpoint. */
+            Token token = Token.serializer().deserialize(bufIn);
+            logger_.info("Updating the token to [" + token + "]");
+            StorageService.instance().updateToken(token);
+        }
+        catch (IOException ex)
+        {
+            throw new RuntimeException(ex);
         }
-    	catch( IOException ex )
-    	{
-    		if (logger_.isDebugEnabled())
-    		  logger_.debug(LogUtil.throwableToString(ex));
-    	}
     }
 
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Tue Sep 15 15:47:36 2009
@@ -23,58 +23,67 @@
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.InputStreamReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectorManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
 
 public class TokenUpdater
 {
     private static final int port_ = 7000;
     private static final long waitTime_ = 10000;
-    
+
     public static void main(String[] args) throws Throwable
     {
-        if ( args.length != 3 )
+        if (args.length < 2)
         {
-            System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token> <file containing node token info>");
+            System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token>");
             System.exit(1);
         }
-        
+
+        Thread selectorThread = SelectorManager.getSelectorManager();
+        selectorThread.setDaemon(true);
+        selectorThread.start();
+
         String ipPort = args[0];
         IPartitioner p = StorageService.getPartitioner();
         Token token = p.getTokenFactory().fromString(args[1]);
-        String file = args[2];
-        
+        System.out.println("Partitioner is " + p.getClass() + ", token is: " + token);
+        System.out.println(p.getTokenFactory().getClass());
+
         String[] ipPortPair = ipPort.split(":");
-        EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
+        int port = 7000;
+        if (ipPortPair.length > 1)
+        {
+            port = Integer.valueOf(ipPortPair[1]);
+        }
+
+        EndPoint target = new EndPoint(ipPortPair[0], port);
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         Token.serializer().serialize(token, dos);
 
         /* Construct the token update message to be sent */
-        Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
-        
-        BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
-        String line = null;
-       
-        while ( ( line = bufReader.readLine() ) != null )
-        {
-            String[] nodeTokenPair = line.split(" ");
-            /* Add the node and the token pair into the header of this message. */
-            Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
-            tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
-        }
-        
+        Message tokenUpdateMessage = new Message(new EndPoint(FBUtilities.getHostAddress(), port_),
+                                                 "",
+                                                 StorageService.tokenVerbHandler_,
+                                                 bos.toByteArray());
+
         System.out.println("Sending a token update message to " + target);
         MessagingService.getMessagingInstance().sendOneWay(tokenUpdateMessage, target);
         Thread.sleep(TokenUpdater.waitTime_);
         System.out.println("Done sending the update message");
-    }
 
+        MessagingService.shutdown();
+        FileUtils.shutdown();
+    }
 }