You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/15 17:47:37 UTC
svn commit: r815373 - in /incubator/cassandra/trunk: bin/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/tools/
Author: jbellis
Date: Tue Sep 15 15:47:36 2009
New Revision: 815373
URL: http://svn.apache.org/viewvc?rev=815373&view=rev
Log:
Fix deserialization bug in TokenUpdateVerbHandler; change TokenUpdater to update one node at a time, and add bin/tokenupdater as a convenience. Patch by Sammy Yu; reviewed for CASSANDRA-363 by jbellis
Added:
incubator/cassandra/trunk/bin/tokenupdater
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
Added: incubator/cassandra/trunk/bin/tokenupdater
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/bin/tokenupdater?rev=815373&view=auto
==============================================================================
--- incubator/cassandra/trunk/bin/tokenupdater (added)
+++ incubator/cassandra/trunk/bin/tokenupdater Tue Sep 15 15:47:36 2009
@@ -0,0 +1,49 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ for include in /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh \
+ `dirname $0`/cassandra.in.sh; do
+ if [ -r $include ]; then
+ . $include
+ break
+ fi
+ done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+ . $CASSANDRA_INCLUDE
+fi
+
+if [ -z $CASSANDRA_CONF -o -z $CLASSPATH ]; then
+ echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+ exit 1
+fi
+
+# Special-case path variables.
+case "`uname`" in
+ CYGWIN*)
+ CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+ CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"`
+ ;;
+esac
+
+java -cp $CLASSPATH -Dstorage-config=$CASSANDRA_CONF \
+ org.apache.cassandra.tools.TokenUpdater $@
+
+# vi:ai sw=4 ts=4 tw=0 et
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Sep 15 15:47:36 2009
@@ -514,6 +514,8 @@
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
+ StageManager.shutdown();
+
/* shut down the cachetables */
taskCompletionMap_.shutdown();
callbackMap_.shutdown();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java Tue Sep 15 15:47:36 2009
@@ -23,9 +23,9 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
public class TokenUpdateVerbHandler implements IVerbHandler
{
@@ -33,18 +33,20 @@
public void doVerb(Message message)
{
- byte[] body = message.getMessageBody();
- Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
try
{
- logger_.info("Updating the token to [" + token + "]");
- StorageService.instance().updateToken(token);
+ /* Deserialize to get the token for this endpoint. */
+ Token token = Token.serializer().deserialize(bufIn);
+ logger_.info("Updating the token to [" + token + "]");
+ StorageService.instance().updateToken(token);
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
}
- catch( IOException ex )
- {
- if (logger_.isDebugEnabled())
- logger_.debug(LogUtil.throwableToString(ex));
- }
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=815373&r1=815372&r2=815373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Tue Sep 15 15:47:36 2009
@@ -23,58 +23,67 @@
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStreamReader;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectorManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
public class TokenUpdater
{
private static final int port_ = 7000;
private static final long waitTime_ = 10000;
-
+
public static void main(String[] args) throws Throwable
{
- if ( args.length != 3 )
+ if (args.length < 2)
{
- System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token> <file containing node token info>");
+ System.out.println("Usage : java org.apache.cassandra.tools.TokenUpdater <ip:port> <token>");
System.exit(1);
}
-
+
+ Thread selectorThread = SelectorManager.getSelectorManager();
+ selectorThread.setDaemon(true);
+ selectorThread.start();
+
String ipPort = args[0];
IPartitioner p = StorageService.getPartitioner();
Token token = p.getTokenFactory().fromString(args[1]);
- String file = args[2];
-
+ System.out.println("Partitioner is " + p.getClass() + ", token is: " + token);
+ System.out.println(p.getTokenFactory().getClass());
+
String[] ipPortPair = ipPort.split(":");
- EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
+ int port = 7000;
+ if (ipPortPair.length > 1)
+ {
+ port = Integer.valueOf(ipPortPair[1]);
+ }
+
+ EndPoint target = new EndPoint(ipPortPair[0], port);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
Token.serializer().serialize(token, dos);
/* Construct the token update message to be sent */
- Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
-
- BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
- String line = null;
-
- while ( ( line = bufReader.readLine() ) != null )
- {
- String[] nodeTokenPair = line.split(" ");
- /* Add the node and the token pair into the header of this message. */
- Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
- tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
- }
-
+ Message tokenUpdateMessage = new Message(new EndPoint(FBUtilities.getHostAddress(), port_),
+ "",
+ StorageService.tokenVerbHandler_,
+ bos.toByteArray());
+
System.out.println("Sending a token update message to " + target);
MessagingService.getMessagingInstance().sendOneWay(tokenUpdateMessage, target);
Thread.sleep(TokenUpdater.waitTime_);
System.out.println("Done sending the update message");
- }
+ MessagingService.shutdown();
+ FileUtils.shutdown();
+ }
}