You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [3/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.*;
+import java.security.MessageDigest;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.nio.channels.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.xml.bind.*;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.MultiThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.http.HttpConnectionHandler;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingService implements IMessagingService, MessagingServiceMBean
+{
+ private static boolean debugOn_ = false;
+
+ private static int version_ = 1;
+ //TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
+ private static SerializerType serializerType_ = SerializerType.BINARY;
+
+ private static byte[] protocol_ = new byte[16];
+ /* Verb Handler for the Response */
+ public static final String responseVerbHandler_ = "RESPONSE";
+ /* Stage for responses. */
+ public static final String responseStage_ = "RESPONSE-STAGE";
+ private enum ReservedVerbs_ {RESPONSE};
+
+ private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
+ /* Indicate if we are currently streaming data to another node or receiving streaming data */
+ private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
+
+ /* This records all the results mapped by message Id */
+ private static ICachetable<String, IAsyncCallback> callbackMap_;
+ private static ICachetable<String, IAsyncResult> taskCompletionMap_;
+
+ /* Manages the table of endpoints it is listening on */
+ private static Set<EndPoint> endPoints_;
+
+ /* List of sockets we are listening on */
+ private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+
+ /* Lookup table for registering message handlers based on the verb. */
+ private static Map<String, IVerbHandler> verbHandlers_;
+
+ private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
+
+ /* Thread pool to handle messaging read activities of Socket and default stage */
+ private static ExecutorService messageDeserializationExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService messageSerializerExecutor_;
+
+ /* Thread pool to handle deserialization of messages read from the socket. */
+ private static ExecutorService messageDeserializerExecutor_;
+
+ /* Thread pool to handle messaging write activities */
+ private static ExecutorService streamExecutor_;
+
+ private final static ReentrantLock lock_ = new ReentrantLock();
+ private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+
+ private static boolean bShutdown_ = false;
+
+ private static Logger logger_ = Logger.getLogger(MessagingService.class);
+
+ private static IMessagingService messagingService_ = new MessagingService();
+
+ public static boolean isDebugOn()
+ {
+ return debugOn_;
+ }
+
+ public static void debugOn(boolean on)
+ {
+ debugOn_ = on;
+ }
+
+ public static SerializerType getSerializerType()
+ {
+ return serializerType_;
+ }
+
+ public synchronized static void serializerType(String type)
+ {
+ if ( type.equalsIgnoreCase("binary") )
+ {
+ serializerType_ = SerializerType.BINARY;
+ }
+ else if ( type.equalsIgnoreCase("java") )
+ {
+ serializerType_ = SerializerType.JAVA;
+ }
+ else if ( type.equalsIgnoreCase("xml") )
+ {
+ serializerType_ = SerializerType.XML;
+ }
+ }
+
+ public static int getVersion()
+ {
+ return version_;
+ }
+
+ public static void setVersion(int version)
+ {
+ version_ = version;
+ }
+
+ public static IMessagingService getMessagingInstance()
+ {
+ if ( bShutdown_ )
+ {
+ lock_.lock();
+ try
+ {
+ if ( bShutdown_ )
+ {
+ messagingService_ = new MessagingService();
+ bShutdown_ = false;
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return messagingService_;
+ }
+
+ public Object clone() throws CloneNotSupportedException
+ {
+ //Prevents the singleton from being cloned
+ throw new CloneNotSupportedException();
+ }
+
+ protected MessagingService()
+ {
+ for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
+ {
+ reservedVerbs_.put(verbs.toString(), verbs.toString());
+ }
+ verbHandlers_ = new HashMap<String, IVerbHandler>();
+ endPoints_ = new HashSet<EndPoint>();
+ /*
+ * Leave callbacks in the cachetable long enough that any related messages will arrive
+ * before the callback is evicted from the table. The concurrency level is set at 128
+ * which is the sum of the threads in the pool that adds shit into the table and the
+ * pool that retrives the callback from here.
+ */
+ int maxSize = MessagingConfig.getMessagingThreadCount();
+ callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
+ taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
+
+ messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
+ );
+
+ messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
+ );
+
+ messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+ maxSize,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
+ );
+
+ streamExecutor_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("MESSAGE-STREAMING-POOL")
+ );
+
+ protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());
+ /* register the response verb handler */
+ registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+ /* register stage for response */
+ StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
+ }
+
+ public byte[] hash(String type, byte data[])
+ {
+ byte result[] = null;
+ try
+ {
+ MessageDigest messageDigest = MessageDigest.getInstance(type);
+ result = messageDigest.digest(data);
+ }
+ catch(Exception e)
+ {
+ LogUtil.getLogger(MessagingService.class.getName()).debug(LogUtil.throwableToString(e));
+ }
+ return result;
+ }
+
+ public long getMessagingSerializerTaskCount()
+ {
+ DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageSerializerExecutor_;
+ return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+ }
+
+ public long getMessagingReceiverTaskCount()
+ {
+ DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageDeserializationExecutor_;
+ return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+ }
+
+ public void listen(EndPoint localEp, boolean isHttp) throws IOException
+ {
+ ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ ServerSocket ss = serverChannel.socket();
+ ss.bind(localEp.getInetAddress());
+ serverChannel.configureBlocking(false);
+
+ SelectionKeyHandler handler = null;
+ if ( isHttp )
+ {
+ handler = new HttpConnectionHandler();
+ }
+ else
+ {
+ handler = new TcpConnectionHandler(localEp);
+ }
+
+ SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
+ endPoints_.add(localEp);
+ listenSockets_.put(localEp, key);
+ }
+
+ public void listenUDP(EndPoint localEp)
+ {
+ UdpConnection connection = new UdpConnection();
+ logger_.debug("Starting to listen on " + localEp);
+ try
+ {
+ connection.init(localEp.getPort());
+ endPoints_.add(localEp);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+ {
+ String key = from + ":" + to;
+ TcpConnectionManager cp = poolTable_.get(key);
+ if( cp == null )
+ {
+ lock_.lock();
+ try
+ {
+ cp = poolTable_.get(key);
+ if (cp == null )
+ {
+ cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(),
+ MessagingConfig.getConnectionPoolGrowthFactor(),
+ MessagingConfig.getConnectionPoolMaxSize(), from, to);
+ poolTable_.put(key, cp);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return cp;
+ }
+
+ public static ConnectionStatistics[] getPoolStatistics()
+ {
+ Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();
+ Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
+ while ( it.hasNext() )
+ {
+ TcpConnectionManager cp = it.next();
+ ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
+ stats.add( cs );
+ }
+ return stats.toArray(new ConnectionStatistics[0]);
+ }
+
+ public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ return getConnectionPool(from, to).getConnection();
+ }
+
+ private void checkForReservedVerb(String type)
+ {
+ if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
+ {
+ throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
+ }
+ }
+
+ public void registerVerbHandlers(String type, IVerbHandler verbHandler)
+ {
+ checkForReservedVerb(type);
+ verbHandlers_.put(type, verbHandler);
+ }
+
+ public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+ {
+ Iterator keys = verbHandlers_.keySet().iterator();
+ String key = null;
+
+ /*
+ * endpoint specific verbhandlers can be distinguished because
+ * their key's contain the name of the endpoint.
+ */
+ while(keys.hasNext())
+ {
+ key = (String)keys.next();
+ if(-1 != key.indexOf(localEndPoint.toString()))
+ keys.remove();
+ }
+ }
+
+ public void deregisterVerbHandlers(String type)
+ {
+ verbHandlers_.remove(type);
+ }
+
+ public IVerbHandler getVerbHandler(String type)
+ {
+ IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
+ return handler;
+ }
+
+ public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ for ( int i = 0; i < to.length; ++i )
+ {
+ sendOneWay(message, to[i]);
+ }
+ return messageId;
+ }
+
+ public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+ {
+ String messageId = message.getMessageId();
+ callbackMap_.put(messageId, cb);
+ sendOneWay(message, to);
+ return messageId;
+ }
+
+ public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+ {
+ if ( messages.length != to.length )
+ {
+ throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+ }
+ String groupId = GuidGenerator.guid();
+ callbackMap_.put(groupId, cb);
+ for ( int i = 0; i < messages.length; ++i )
+ {
+ messages[i].setMessageId(groupId);
+ sendOneWay(messages[i], to[i]);
+ }
+ return groupId;
+ }
+
+ /*
+ Use this version for fire and forget style messaging.
+ */
+ public void sendOneWay(Message message, EndPoint to)
+ {
+ // do local deliveries
+ if ( message.getFrom().equals(to) )
+ {
+ MessagingService.receive(message);
+ return;
+ }
+
+ Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
+ messageSerializerExecutor_.execute(tcpWriteEvent);
+ }
+
+ public IAsyncResult sendRR(Message message, EndPoint to)
+ {
+ IAsyncResult iar = new AsyncResult();
+ taskCompletionMap_.put(message.getMessageId(), iar);
+ sendOneWay(message, to);
+ return iar;
+ }
+
+ public void sendUdpOneWay(Message message, EndPoint to)
+ {
+ EndPoint from = message.getFrom();
+ if (message.getFrom().equals(to)) {
+ MessagingService.receive(message);
+ return;
+ }
+
+ UdpConnection connection = null;
+ try
+ {
+ connection = new UdpConnection();
+ connection.init();
+ connection.write(message, to);
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ finally
+ {
+ if ( connection != null )
+ connection.close();
+ }
+ }
+
+ public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+ {
+ isStreaming_.set(true);
+ /* Streaming asynchronously on streamExector_ threads. */
+ Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+ streamExecutor_.execute(streamingTask);
+ }
+
+ /*
+ * Does the application determine if we are currently streaming data.
+ * This would imply either streaming to a receiver, receiving streamed
+ * data or both.
+ */
+ public static boolean isStreaming()
+ {
+ return isStreaming_.get();
+ }
+
+ public static void setStreamingMode(boolean bVal)
+ {
+ isStreaming_.set(bVal);
+ }
+
+ public static void shutdown()
+ {
+ logger_.info("Shutting down ...");
+ synchronized ( MessagingService.class )
+ {
+ /* Stop listening on any socket */
+ for( SelectionKey skey : listenSockets_.values() )
+ {
+ SelectorManager.getSelectorManager().cancel(skey);
+ }
+ listenSockets_.clear();
+
+ /* Shutdown the threads in the EventQueue's */
+ messageDeserializationExecutor_.shutdownNow();
+ messageSerializerExecutor_.shutdownNow();
+ messageDeserializerExecutor_.shutdownNow();
+ streamExecutor_.shutdownNow();
+
+ /* shut down the cachetables */
+ taskCompletionMap_.shutdown();
+ callbackMap_.shutdown();
+
+ /* Interrupt the selector manager thread */
+ SelectorManager.getSelectorManager().interrupt();
+
+ poolTable_.clear();
+ verbHandlers_.clear();
+ bShutdown_ = true;
+ }
+ logger_.debug("Shutdown invocation complete.");
+ }
+
+ public static void receive(Message message)
+ {
+ enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
+ }
+
+ public static boolean isLocalEndPoint(EndPoint ep)
+ {
+ return ( endPoints_.contains(ep) );
+ }
+
+ private static void enqueueRunnable(String stageName, Runnable runnable){
+
+ IStage stage = StageManager.getStage(stageName);
+
+ if ( stage != null )
+ {
+ logger_.info("Running on stage " + stage.getName());
+ stage.execute(runnable);
+ }
+ else
+ {
+ logger_.info("Running on default stage - beware");
+ messageSerializerExecutor_.execute(runnable);
+ }
+ }
+
+ public static IAsyncCallback getRegisteredCallback(String key)
+ {
+ return callbackMap_.get(key);
+ }
+
+ public static void removeRegisteredCallback(String key)
+ {
+ callbackMap_.remove(key);
+ }
+
+ public static IAsyncResult getAsyncResult(String key)
+ {
+ return taskCompletionMap_.remove(key);
+ }
+
+ public static void removeAsyncResult(String key)
+ {
+ taskCompletionMap_.remove(key);
+ }
+
+ public static byte[] getProtocol()
+ {
+ return protocol_;
+ }
+
+ public static ExecutorService getReadExecutor()
+ {
+ return messageDeserializationExecutor_;
+ }
+
+ public static ExecutorService getWriteExecutor()
+ {
+ return messageSerializerExecutor_;
+ }
+
+ public static ExecutorService getDeserilizationExecutor()
+ {
+ return messageDeserializerExecutor_;
+ }
+
+ public static boolean isProtocolValid(byte[] protocol)
+ {
+ return isEqual(protocol_, protocol);
+ }
+
+ public static boolean isEqual(byte digestA[], byte digestB[])
+ {
+ return MessageDigest.isEqual(digestA, digestB);
+ }
+
+ public static byte[] toByteArray(int i)
+ {
+ byte bytes[] = new byte[4];
+ bytes[0] = (byte)(i >>> 24 & 0xff);
+ bytes[1] = (byte)(i >>> 16 & 0xff);
+ bytes[2] = (byte)(i >>> 8 & 0xff);
+ bytes[3] = (byte)(i & 0xff);
+ return bytes;
+ }
+
+ public static byte[] toByteArray(short s)
+ {
+ byte bytes[] = new byte[2];
+ bytes[0] = (byte)(s >>> 8 & 0xff);
+ bytes[1] = (byte)(s & 0xff);
+ return bytes;
+ }
+
+ public static short byteArrayToShort(byte bytes[])
+ {
+ return byteArrayToShort(bytes, 0);
+ }
+
+ public static short byteArrayToShort(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 2)
+ throw new IllegalArgumentException("A short must be 2 bytes in size.");
+ short n = 0;
+ for(int i = 0; i < 2; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static int getBits(int x, int p, int n)
+ {
+ return x >>> (p + 1) - n & ~(-1 << n);
+ }
+
+ public static int byteArrayToInt(byte bytes[])
+ {
+ return byteArrayToInt(bytes, 0);
+ }
+
+ public static int byteArrayToInt(byte bytes[], int offset)
+ {
+ if(bytes.length - offset < 4)
+ throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+ int n = 0;
+ for(int i = 0; i < 4; i++)
+ {
+ n <<= 8;
+ n |= bytes[offset + i] & 0xff;
+ }
+
+ return n;
+ }
+
+ public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
+ {
+ byte[] size = toByteArray(bytes.length);
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The 5th bit is used to indicate that the sender
+ is not listening on any well defined port. This implies the
+ receiver needs to cache the connection using the port on the
+ socket. The following 3 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // set listening 5th bit
+ if ( listening )
+ n |= 16;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.put(size);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+ {
+ /*
+ Setting up the protocol header. This is 4 bytes long
+ represented as an integer. The first 2 bits indicate
+ the serializer type. The 3rd bit indicates if compression
+ is turned on or off. It is turned off by default. The 4th
+ bit indicates if we are in streaming mode. It is turned off
+ by default. The following 4 bits are reserved for future use.
+ The next 8 bits indicate a version number. Remaining 15 bits
+ are not used currently.
+ */
+ int n = 0;
+ // Setting up the serializer bit
+ n |= serializerType_.ordinal();
+ // set compression bit.
+ if ( compress )
+ n |= 4;
+
+ // set streaming bit
+ if ( stream )
+ n |= 8;
+
+ // Setting up the version bit
+ n |= (version_ << 8);
+ /* Finished the protocol header setup */
+
+ byte[] header = toByteArray(n);
+ ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
+ buffer.put(protocol_);
+ buffer.put(header);
+ buffer.flip();
+ return buffer;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface MessagingServiceMBean
+{
+ public long getMessagingSerializerTaskCount();
+ public long getMessagingReceiverTaskCount();
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+ public static final String SERIALIZER = "SERIALIZER";
+ public static final String COMPRESSION = "COMPRESSION";
+ public static final String VERSION = "VERSION";
+
+ public int serializerType_;
+ public boolean isCompressed_;
+ public boolean isStreamingMode_;
+ public boolean isListening_;
+ public int version_;
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+ private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ String messageId = message.getMessageId();
+ IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+ if ( cb != null )
+ {
+ logger_.info("Processing response on a callback from " + message.getFrom());
+ cb.response(message);
+ }
+ else
+ {
+ AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+ if ( ar != null )
+ {
+ logger_.info("Processing response on an async result from " + message.getFrom());
+ ar.result(message.getMessageBody());
+ }
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler
+{
+ public void modifyKey(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes acceptable.
+ *
+ * @param key The key which is acceptable.
+ */
+ public void accept(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes connectable.
+ *
+ * @param key The key which is connectable.
+ */
+ public void connect(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes readable.
+ *
+ * @param key The key which is readable.
+ */
+ public void read(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+ }
+
+ /**
+ * Method which is called when the key becomes writable.
+ *
+ * @param key The key which is writable.
+ */
+ public void write(SelectionKey key)
+ {
+ throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeSet;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectorManager extends Thread
+{
+ private static final Logger logger_ = Logger.getLogger(SelectorManager.class);
+ // the underlying selector used
+ /**
+ * DESCRIBE THE FIELD
+ */
+ protected Selector selector_;
+ protected HashSet<SelectionKey> modifyKeysForRead_;
+ protected HashSet<SelectionKey> modifyKeysForWrite_;
+
+ // the list of keys waiting to be cancelled
+ protected HashSet<SelectionKey> cancelledKeys_;
+
+ // The static selector manager which is used by all applications
+ private static SelectorManager manager_;
+
+ // The static UDP selector manager which is used by all applications
+ private static SelectorManager udpManager_;
+
+ /**
+ * Constructor, which is private since there is only one selector per JVM.
+ *
+ * @param profile
+ * DESCRIBE THE PARAMETER
+ */
+ protected SelectorManager(String name)
+ {
+ super(name);
+ this.modifyKeysForRead_ = new HashSet<SelectionKey>();
+ this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
+ this.cancelledKeys_ = new HashSet<SelectionKey>();
+
+ // attempt to create selector
+ try
+ {
+ selector_ = Selector.open();
+ }
+ catch (IOException e)
+ {
+ logger_.error("SEVERE ERROR (SelectorManager): Error creating selector "
+ + e);
+ }
+
+ setDaemon(false);
+ start();
+ }
+
+ /**
+ * Method which asks the Selector Manager to add the given key to the
+ * cancelled set. If noone calls register on this key during the rest of
+ * this select() operation, the key will be cancelled. Otherwise, it will be
+ * returned as a result of the register operation.
+ *
+ * @param key
+ * The key to cancel
+ */
+ public void cancel(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized ( cancelledKeys_ )
+ {
+ cancelledKeys_.add(key);
+ }
+ }
+
+ /**
+ * Registers a new channel with the selector, and attaches the given
+ * SelectionKeyHandler as the handler for the newly created key. Operations
+ * which the hanlder is interested in will be called as available.
+ *
+ * @param channel
+ * The channel to regster with the selector
+ * @param handler
+ * The handler to use for the callbacks
+ * @param ops
+ * The initial interest operations
+ * @return The SelectionKey which uniquely identifies this channel
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ public SelectionKey register(SelectableChannel channel,
+ SelectionKeyHandler handler, int ops) throws IOException
+ {
+ if ((channel == null) || (handler == null))
+ {
+ throw new NullPointerException();
+ }
+
+ selector_.wakeup();
+ SelectionKey key = channel.register(selector_, ops, handler);
+ synchronized(cancelledKeys_)
+ {
+ cancelledKeys_.remove(key);
+ }
+ selector_.wakeup();
+ return key;
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized(modifyKeysForRead_)
+ {
+ modifyKeysForRead_.add(key);
+ }
+ selector_.wakeup();
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ if (key == null)
+ {
+ throw new NullPointerException();
+ }
+
+ synchronized( modifyKeysForWrite_ )
+ {
+ modifyKeysForWrite_.add(key);
+ }
+ selector_.wakeup();
+ }
+
+ /**
+ * This method starts the socket manager listening for events. It is
+ * designed to be started when this thread's start() method is invoked.
+ */
+ public void run()
+ {
+ try
+ {
+ // loop while waiting for activity
+ while (true && !Thread.currentThread().interrupted() )
+ {
+ try
+ {
+ doProcess();
+ selector_.select(1000);
+ synchronized( cancelledKeys_ )
+ {
+ if (cancelledKeys_.size() > 0)
+ {
+ SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);
+
+ for ( SelectionKey key : keys )
+ {
+ key.cancel();
+ key.channel().close();
+ }
+ cancelledKeys_.clear();
+ }
+ }
+ }
+ catch ( IOException e )
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+
+ manager_ = null;
+ }
+ catch (Throwable t)
+ {
+ logger_.error("ERROR (SelectorManager.run): " + t);
+ logger_.error(LogUtil.throwableToString(t));
+ System.exit(-1);
+ }
+ }
+
+ protected void doProcess() throws IOException
+ {
+ doInvocationsForRead();
+ doInvocationsForWrite();
+ doSelections();
+ }
+
+ /**
+ * DESCRIBE THE METHOD
+ *
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ protected void doSelections() throws IOException
+ {
+ SelectionKey[] keys = selectedKeys();
+
+ for (int i = 0; i < keys.length; i++)
+ {
+ selector_.selectedKeys().remove(keys[i]);
+
+ synchronized (keys[i])
+ {
+ SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
+ .attachment();
+
+ if (skh != null)
+ {
+ // accept
+ if (keys[i].isValid() && keys[i].isAcceptable())
+ {
+ skh.accept(keys[i]);
+ }
+
+ // connect
+ if (keys[i].isValid() && keys[i].isConnectable())
+ {
+ skh.connect(keys[i]);
+ }
+
+ // read
+ if (keys[i].isValid() && keys[i].isReadable())
+ {
+ skh.read(keys[i]);
+ }
+
+ // write
+ if (keys[i].isValid() && keys[i].isWritable())
+ {
+ skh.write(keys[i]);
+ }
+ }
+ else
+ {
+ keys[i].channel().close();
+ keys[i].cancel();
+ }
+ }
+ }
+ }
+
+ private void doInvocationsForRead()
+ {
+ Iterator<SelectionKey> it;
+ synchronized (modifyKeysForRead_)
+ {
+ it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
+ modifyKeysForRead_.clear();
+ }
+
+ while (it.hasNext())
+ {
+ SelectionKey key = it.next();
+ if (key.isValid() && (key.attachment() != null))
+ {
+ ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
+ }
+ }
+ }
+
+ private void doInvocationsForWrite()
+ {
+ Iterator<SelectionKey> it;
+ synchronized (modifyKeysForWrite_)
+ {
+ it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
+ modifyKeysForWrite_.clear();
+ }
+
+ while (it.hasNext())
+ {
+ SelectionKey key = it.next();
+ if (key.isValid() && (key.attachment() != null))
+ {
+ ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
+ }
+ }
+ }
+
+ /**
+ * Selects all of the currenlty selected keys on the selector and returns
+ * the result as an array of keys.
+ *
+ * @return The array of keys
+ * @exception IOException
+ * DESCRIBE THE EXCEPTION
+ */
+ protected SelectionKey[] selectedKeys() throws IOException
+ {
+ return (SelectionKey[]) selector_.selectedKeys().toArray(
+ new SelectionKey[0]);
+ }
+
+ /**
+ * Returns the SelectorManager applications should use.
+ *
+ * @return The SelectorManager which applications should use
+ */
+ public static SelectorManager getSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (manager_ == null)
+ {
+ manager_ = new SelectorManager("TCP Selector Manager");
+ }
+ }
+ return manager_;
+ }
+
+ public static SelectorManager getUdpSelectorManager()
+ {
+ synchronized (SelectorManager.class)
+ {
+ if (udpManager_ == null)
+ {
+ udpManager_ = new SelectorManager("UDP Selector Manager");
+ }
+ }
+ return udpManager_;
+ }
+
+ /**
+ * Returns whether or not this thread of execution is the selector thread
+ *
+ * @return Whether or not this is the selector thread
+ */
+ public static boolean isSelectorThread()
+ {
+ return (Thread.currentThread() == manager_);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.net.sink.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+ // logging and profiling.
+ private static Logger logger_ = Logger.getLogger(TcpConnection.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private SocketChannel socketChannel_;
+ private SelectionKey key_;
+ private TcpConnectionManager pool_;
+ private boolean isIncoming_ = false;
+ private TcpReader tcpReader_;
+ private ReadWorkItem readWork_ = new ReadWorkItem();
+ private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();
+ private AtomicBoolean connected_ = new AtomicBoolean(false);
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ boolean inUse_ = false;
+
+ /*
+ * Added for streaming support. We need the boolean
+ * to indicate that this connection is used for
+ * streaming. The Condition and the Lock are used
+ * to signal the stream() that it can continue
+ * streaming when the socket becomes writable.
+ */
+ private boolean bStream_ = false;
+ private Lock lock_;
+ private Condition condition_;
+
+ // used from getConnection - outgoing
+ TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+ pool_ = pool;
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ connected_.set(true);
+ }
+ }
+
+ /*
+ * Used for streaming purposes has no pooling semantics.
+ */
+ TcpConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ connected_.set(true);
+ }
+ bStream_ = true;
+ lock_ = new ReentrantLock();
+ condition_ = lock_.newCondition();
+ }
+
+ /*
+ * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+ * Accept the connection and then register interest for reads.
+ */
+ static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+ tcpConnection.registerReadInterest();
+ }
+
+ private void registerReadInterest() throws IOException
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ // used for incoming connections
+ TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ socketChannel_ = socketChannel;
+ socketChannel_.configureBlocking(false);
+ isIncoming_ = isIncoming;
+ connected_.set(true);
+ localEp_ = localEp;
+ }
+
+ EndPoint getLocalEp()
+ {
+ return localEp_;
+ }
+
+ public void setLocalEp(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public EndPoint getEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ public boolean isIncoming()
+ {
+ return isIncoming_;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return socketChannel_;
+ }
+
+ public void write(Message message) throws IOException
+ {
+ byte[] data = serializer_.serialize(message);
+ if ( data.length > 0 )
+ {
+ boolean listening = ( message.getFrom().equals(EndPoint.randomLocalEndPoint_) ) ? false : true;
+ ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
+ synchronized(this)
+ {
+ if (!pendingWrites_.isEmpty() || !connected_.get())
+ {
+ pendingWrites_.add(buffer);
+ return;
+ }
+
+ logger_.debug("Sending packets of size " + data.length);
+ socketChannel_.write(buffer);
+
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ }
+ }
+ }
+ }
+
+ public void stream(File file, long startPosition, long endPosition) throws IOException
+ {
+ if ( !bStream_ )
+ throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+
+ lock_.lock();
+ try
+ {
+ /* transfer 64MB in each attempt */
+ int limit = 64*1024*1024;
+ long total = endPosition - startPosition;
+ /* keeps track of total number of bytes transferred */
+ long bytesWritten = 0L;
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ FileChannel fc = raf.getChannel();
+
+ /*
+ * If the connection is not yet established then wait for
+ * the timeout period of 2 seconds. Attempt to reconnect 3 times and then
+ * bail with an IOException.
+ */
+ long waitTime = 2;
+ int retry = 0;
+ while ( !connected_.get() )
+ {
+ if ( retry == 3 )
+ throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+ waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+ ++retry;
+ }
+
+ while ( bytesWritten < total )
+ {
+ if ( startPosition == 0 )
+ {
+ ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+ socketChannel_.write(buffer);
+ handleIncompleteWrite(buffer);
+ }
+
+ /* returns the number of bytes transferred from file to the socket */
+ long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+ logger_.debug("Bytes transferred " + bytesTransferred);
+ bytesWritten += bytesTransferred;
+ startPosition += bytesTransferred;
+ /*
+ * If the number of bytes transferred is less than intended
+ * then we need to wait till socket becomes writeable again.
+ */
+ if ( bytesTransferred < limit && bytesWritten != total )
+ {
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ waitToContinueStreaming();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ private void handleIncompleteWrite(ByteBuffer buffer)
+ {
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ waitToContinueStreaming();
+ }
+ }
+
+ private void waitToContinueStreaming()
+ {
+ try
+ {
+ condition_.await();
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ private void waitToContinueStreaming(long waitTime, TimeUnit tu)
+ {
+ try
+ {
+ condition_.await(waitTime, tu);
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ private void resumeStreaming()
+ {
+ /* if not in streaming mode do nothing */
+ if ( !bStream_ )
+ return;
+
+ lock_.lock();
+ try
+ {
+ condition_.signal();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ public void close()
+ {
+ inUse_ = false;
+ if ( pool_.contains(this) )
+ pool_.decUsed();
+ }
+
+ public boolean isConnected()
+ {
+ return socketChannel_.isConnected();
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof TcpConnection) )
+ return false;
+
+ TcpConnection rhs = (TcpConnection)o;
+ if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+ return true;
+ else
+ return false;
+ }
+
+ public int hashCode()
+ {
+ return (localEp_ + ":" + remoteEp_).hashCode();
+ }
+
+ public String toString()
+ {
+ return socketChannel_.toString();
+ }
+
+ void closeSocket()
+ {
+ logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ cancel(key_);
+ pendingWrites_.clear();
+ }
+
+ void errorClose()
+ {
+ logger_.warn("Closing down connection " + socketChannel_);
+ pendingWrites_.clear();
+ cancel(key_);
+ pendingWrites_.clear();
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ }
+
+ private void cancel(SelectionKey key)
+ {
+ if ( key != null )
+ SelectorManager.getSelectorManager().cancel(key);
+ }
+
+ // called in the selector thread
+ public void connect(SelectionKey key)
+ {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ try
+ {
+ if (socketChannel_.finishConnect())
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(key);
+ connected_.set(true);
+
+ // this will flush the pending
+ if (!pendingWrites_.isEmpty())
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ resumeStreaming();
+ }
+ else
+ {
+ logger_.warn("Closing connection because socket channel could not finishConnect.");;
+ errorClose();
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn("Encountered IOException on connection: " + socketChannel_);
+ logger_.warn( LogUtil.throwableToString(e) );
+ errorClose();
+ }
+ }
+
+ // called in the selector thread
+ public void write(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ doPendingWrites();
+ /*
+ * This is executed only if we are in streaming mode.
+ * Idea is that we read a chunk of data from a source
+ * and wait to read the next from the source until we
+ * are siganlled to do so from here.
+ */
+ resumeStreaming();
+ }
+
+ void doPendingWrites()
+ {
+ try
+ {
+ while(!pendingWrites_.isEmpty())
+ {
+ ByteBuffer buffer = pendingWrites_.get(0);
+ socketChannel_.write(buffer);
+ if (buffer.remaining() > 0)
+ {
+ break;
+ }
+ pendingWrites_.remove(0);
+ }
+
+ }
+ catch(IOException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ // This is to fix the wierd Linux bug with NIO.
+ errorClose();
+ }
+ finally
+ {
+ synchronized(this)
+ {
+ if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ }
+ }
+ }
+
+ // called in the selector thread
+ public void read(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+ // publish this event onto to the TCPReadEvent Queue.
+ MessagingService.getReadExecutor().execute(readWork_);
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
+ }
+
+ class ReadWorkItem implements Runnable
+ {
+ // called from the TCP READ thread pool
+ public void run()
+ {
+ if ( tcpReader_ == null )
+ {
+ tcpReader_ = new TcpReader(TcpConnection.this);
+ StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolState(tcpReader_);
+ tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+ }
+ tcpReader_.morphState(nextState);
+ }
+
+ try
+ {
+ byte[] bytes = new byte[0];
+ while ( (bytes = tcpReader_.read()).length > 0 )
+ {
+ ProtocolHeader pH = tcpReader_.getProtocolHeader();
+ if ( !pH.isStreamingMode_ )
+ {
+ /* first message received */
+ if (remoteEp_ == null)
+ {
+ int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.randomPort_;
+ remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostName(), port );
+ // put connection into pool if possible
+ pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
+ pool_.addToPool(TcpConnection.this);
+ }
+
+ /* Deserialize and handle the message */
+ MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );
+ tcpReader_.resetState();
+ }
+ else
+ {
+ MessagingService.setStreamingMode(false);
+ /* Close this socket connection used for streaming */
+ closeSocket();
+ }
+ }
+ }
+ catch ( IOException ex )
+ {
+ handleException(ex);
+ }
+ catch ( Throwable th )
+ {
+ handleException(th);
+ }
+ finally
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(key_);
+ }
+ }
+
+ private void handleException(Throwable th)
+ {
+ logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+ logger_.warn(LogUtil.throwableToString(th));
+ // This is to fix the wierd Linux bug with NIO.
+ errorClose();
+ }
+ }
+
+ public int pending()
+ {
+ return pendingWrites_.size();
+ }
+
+ public int compareTo(Object o)
+ {
+ if (o instanceof TcpConnection)
+ {
+ return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();
+ }
+
+ throw new IllegalArgumentException();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+ EndPoint localEp_;
+
+ public TcpConnectionHandler(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public void accept(SelectionKey key)
+ {
+ try
+ {
+ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+ SocketChannel client = serverChannel.accept();
+
+ if ( client != null )
+ {
+ //new TcpConnection(client, localEp_, true);
+ TcpConnection.acceptConnection(client, localEp_, true);
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+ private Lock lock_ = new ReentrantLock();
+ private List<TcpConnection> allConnections_;
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ private int initialSize_;
+ private int growthFactor_;
+ private int maxSize_;
+ private long lastTimeUsed_;
+ private boolean isShut_;
+
+ private int inUse_;
+
+ TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+ {
+ initialSize_ = initialSize;
+ growthFactor_ = growthFactor;
+ maxSize_ = maxSize;
+ localEp_ = localEp;
+ remoteEp_ = remoteEp;
+ isShut_ = false;
+ lastTimeUsed_ = System.currentTimeMillis();
+ allConnections_ = new Vector<TcpConnection>();
+ }
+
+ TcpConnection getConnection() throws IOException
+ {
+ lock_.lock();
+ try
+ {
+ if (allConnections_.isEmpty())
+ {
+ TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+ addToPool(conn);
+ conn.inUse_ = true;
+ incUsed();
+ return conn;
+ }
+
+ TcpConnection least = getLeastLoaded();
+
+ if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+ least.inUse_ = true;
+ incUsed();
+ return least;
+ }
+
+ TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+ if ( connection != null && !contains(connection) )
+ {
+ addToPool(connection);
+ connection.inUse_ = true;
+ incUsed();
+ return connection;
+ }
+ else
+ {
+ if ( connection != null )
+ {
+ connection.closeSocket();
+ }
+ return getLeastLoaded();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ protected TcpConnection getLeastLoaded()
+ {
+ TcpConnection connection = null;
+ lock_.lock();
+ try
+ {
+ Collections.sort(allConnections_);
+ connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return connection;
+ }
+
+ void removeConnection(TcpConnection connection)
+ {
+ allConnections_.remove(connection);
+ }
+
+ void incUsed()
+ {
+ inUse_++;
+ }
+
+ void decUsed()
+ {
+ inUse_--;
+ }
+
+ int getConnectionsInUse()
+ {
+ return inUse_;
+ }
+
+ void addToPool(TcpConnection connection)
+ {
+
+ if ( contains(connection) )
+ return;
+
+ lock_.lock();
+ try
+ {
+ if ( allConnections_.size() < maxSize_ )
+ {
+ allConnections_.add(connection);
+ }
+ else
+ {
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ void shutdown()
+ {
+ lock_.lock();
+ try
+ {
+ while ( allConnections_.size() > 0 )
+ {
+ TcpConnection connection = allConnections_.remove(0);
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ isShut_ = true;
+ }
+
+ int getPoolSize()
+ {
+ return allConnections_.size();
+ }
+
+ EndPoint getLocalEndPoint()
+ {
+ return localEp_;
+ }
+
+ EndPoint getRemoteEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ int getPendingWrites()
+ {
+ int total = 0;
+ lock_.lock();
+ try
+ {
+ for ( TcpConnection connection : allConnections_ )
+ {
+ total += connection.pending();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return total;
+ }
+
+ boolean contains(TcpConnection connection)
+ {
+ return allConnections_.contains(connection);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+ private static final int BUFFER_SIZE = 4096;
+ private static final int protocol_ = 0xBADBEEF;
+
+ private DatagramChannel socketChannel_;
+ private SelectionKey key_;
+ private EndPoint localEndPoint_;
+
+ public void init() throws IOException
+ {
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ }
+
+ public void init(int port) throws IOException
+ {
+ // TODO: get TCP port from config and add one.
+ localEndPoint_ = new EndPoint(port);
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ public boolean write(Message message, EndPoint to) throws IOException
+ {
+ boolean bVal = true;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ Message.serializer().serialize(message, dos);
+ byte[] data = bos.toByteArray();
+ if ( data.length > 0 )
+ {
+ logger_.debug("Size of Gossip packet " + data.length);
+ byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+ ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+ buffer.put( protocol );
+ buffer.put(data);
+ buffer.flip();
+
+ int n = socketChannel_.send(buffer, to.getInetAddress());
+ if ( n == 0 )
+ {
+ bVal = false;
+ }
+ }
+ return bVal;
+ }
+
+ void close()
+ {
+ try
+ {
+ if ( socketChannel_ != null )
+ socketChannel_.close();
+ }
+ catch ( IOException ex )
+ {
+ logger_.error( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ public DatagramChannel getDatagramChannel()
+ {
+ return socketChannel_;
+ }
+
+ private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+ {
+ byte[] body = new byte[0];
+ byte[] protocol = new byte[4];
+ buffer = buffer.get(protocol, 0, protocol.length);
+ int value = BasicUtilities.byteArrayToInt(protocol);
+
+ if ( protocol_ != value )
+ {
+ logger_.info("Invalid protocol header in the incoming message " + value);
+ return body;
+ }
+ body = new byte[buffer.remaining()];
+ buffer.get(body, 0, body.length);
+ return body;
+ }
+
+ public void read(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ try
+ {
+ SocketAddress sa = socketChannel_.receive(buffer);
+ if ( sa == null )
+ {
+ logger_.debug("*** No datagram packet was available to be read ***");
+ return;
+ }
+ buffer.flip();
+
+ byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+ if ( bytes.length > 0 )
+ {
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ Message message = Message.serializer().deserialize(dis);
+ if ( message != null )
+ {
+ MessagingService.receive(message);
+ }
+ }
+ }
+ catch ( IOException ioe )
+ {
+ logger_.warn(LogUtil.throwableToString(ioe));
+ }
+ finally
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,94 @@
+package org.apache.cassandra.net.http;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+
+public class ColumnFamilyFormatter extends HTMLFormatter
+{
+ public ColumnFamilyFormatter()
+ {
+ super();
+ }
+
+ public ColumnFamilyFormatter(StringBuilder sb)
+ {
+ super(sb);
+ }
+
+ public void printKeyColumnFamily(StringBuilder sb, String sKey, ColumnFamily cf)
+ {
+ // print the key
+ sb.append("Key = " + sKey + "<br>");
+
+ // print the column familie
+ printColumnFamily(sb, cf);
+ }
+
+ public void printColumnFamily(StringBuilder sb, ColumnFamily cf)
+ {
+ // first print the column family specific data
+ sb.append("ColumnFamily = " + cf.name() + "<br>");
+
+ String columnFamilyType = DatabaseDescriptor.getColumnType(cf.name());
+ Collection<IColumn> cols = cf.getAllColumns();
+ if("Super".equals(columnFamilyType))
+ {
+ printSuperColumns(sb, cols);
+ }
+ else
+ {
+ printSimpleColumns(sb, cols);
+ }
+ }
+
+ public void printSuperColumns(StringBuilder sb, Collection<IColumn> cols)
+ {
+ // print the super column summary
+ sb.append("Number of super columns = " + cols.size() + "<br>");
+
+ startTable();
+ for(IColumn col : cols)
+ {
+ addHeader(col.name());
+ startRow();
+ Collection<IColumn> simpleCols = ((SuperColumn)col).getSubColumns();
+ printSimpleColumns(sb, simpleCols);
+ endRow();
+ }
+ endTable();
+ }
+
+ public void printSimpleColumns(StringBuilder sb, Collection<IColumn> cols)
+ {
+ int numColumns = cols.size();
+ String[] columnNames = new String[numColumns];
+ String[] columnValues = new String[numColumns];
+
+ // print the simple column summary
+ //sb.append("Number of simple columns = " + cols.size() + "<br>");
+
+ int i = 0;
+ for(IColumn col : cols)
+ {
+ columnNames[i] = col.name();
+ columnValues[i] = new String(col.value());
+ ++i;
+ }
+
+ startTable();
+ addHeaders(columnNames);
+ startRow();
+ for(i = 0; i < numColumns; ++i)
+ {
+ addCol(columnValues[i]);
+ }
+ endRow();
+ endTable();
+ }
+}