You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [24/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.net.sink.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+ // logging and profiling.
+ private static Logger logger_ = Logger.getLogger(TcpConnection.class);
+ private static ISerializer serializer_ = new FastSerializer();
+ private SocketChannel socketChannel_;
+ private SelectionKey key_;
+ private TcpConnectionManager pool_;
+ private boolean isIncoming_ = false;
+ private TcpReader tcpReader_;
+ private ReadWorkItem readWork_ = new ReadWorkItem();
+ private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();
+ private AtomicBoolean connected_ = new AtomicBoolean(false);
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ boolean inUse_ = false;
+
+ /*
+ * Added for streaming support. We need the boolean
+ * to indicate that this connection is used for
+ * streaming. The Condition and the Lock are used
+ * to signal the stream() that it can continue
+ * streaming when the socket becomes writable.
+ */
+ private boolean bStream_ = false;
+ private Lock lock_;
+ private Condition condition_;
+
+ // used from getConnection - outgoing
+ TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+ pool_ = pool;
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ connected_.set(true);
+ }
+ }
+
+ /*
+ * Used for streaming purposes has no pooling semantics.
+ */
+ TcpConnection(EndPoint from, EndPoint to) throws IOException
+ {
+ socketChannel_ = SocketChannel.open();
+ socketChannel_.configureBlocking(false);
+
+ localEp_ = from;
+ remoteEp_ = to;
+
+ if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+ }
+ else
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ connected_.set(true);
+ }
+ bStream_ = true;
+ lock_ = new ReentrantLock();
+ condition_ = lock_.newCondition();
+ }
+
+ /*
+ * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+ * Accept the connection and then register interest for reads.
+ */
+ static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+ tcpConnection.registerReadInterest();
+ }
+
+ private void registerReadInterest() throws IOException
+ {
+ key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ // used for incoming connections
+ TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+ {
+ socketChannel_ = socketChannel;
+ socketChannel_.configureBlocking(false);
+ isIncoming_ = isIncoming;
+ connected_.set(true);
+ localEp_ = localEp;
+ }
+
+ EndPoint getLocalEp()
+ {
+ return localEp_;
+ }
+
+ public void setLocalEp(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public EndPoint getEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ public boolean isIncoming()
+ {
+ return isIncoming_;
+ }
+
+ public SocketChannel getSocketChannel()
+ {
+ return socketChannel_;
+ }
+
+ public void write(Message message) throws IOException
+ {
+ byte[] data = serializer_.serialize(message);
+ if ( data.length > 0 )
+ {
+ boolean listening = ( message.getFrom().equals(EndPoint.randomLocalEndPoint_) ) ? false : true;
+ ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);
+ synchronized(this)
+ {
+ if (!pendingWrites_.isEmpty() || !connected_.get())
+ {
+ pendingWrites_.add(buffer);
+ return;
+ }
+
+ logger_.debug("Sending packets of size " + data.length);
+ socketChannel_.write(buffer);
+
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ }
+ }
+ }
+ }
+
+ public void stream(File file, long startPosition, long endPosition) throws IOException
+ {
+ if ( !bStream_ )
+ throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+
+ lock_.lock();
+ try
+ {
+ /* transfer 64MB in each attempt */
+ int limit = 64*1024*1024;
+ long total = endPosition - startPosition;
+ /* keeps track of total number of bytes transferred */
+ long bytesWritten = 0L;
+ RandomAccessFile raf = new RandomAccessFile(file, "r");
+ FileChannel fc = raf.getChannel();
+
+ /*
+ * If the connection is not yet established then wait for
+ * the timeout period of 2 seconds. Attempt to reconnect 3 times and then
+ * bail with an IOException.
+ */
+ long waitTime = 2;
+ int retry = 0;
+ while ( !connected_.get() )
+ {
+ if ( retry == 3 )
+ throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+ waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+ ++retry;
+ }
+
+ while ( bytesWritten < total )
+ {
+ if ( startPosition == 0 )
+ {
+ ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
+ socketChannel_.write(buffer);
+ handleIncompleteWrite(buffer);
+ }
+
+ /* returns the number of bytes transferred from file to the socket */
+ long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+ logger_.debug("Bytes transferred " + bytesTransferred);
+ bytesWritten += bytesTransferred;
+ startPosition += bytesTransferred;
+ /*
+ * If the number of bytes transferred is less than intended
+ * then we need to wait till socket becomes writeable again.
+ */
+ if ( bytesTransferred < limit && bytesWritten != total )
+ {
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ waitToContinueStreaming();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ private void handleIncompleteWrite(ByteBuffer buffer)
+ {
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ waitToContinueStreaming();
+ }
+ }
+
+ private void waitToContinueStreaming()
+ {
+ try
+ {
+ condition_.await();
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ private void waitToContinueStreaming(long waitTime, TimeUnit tu)
+ {
+ try
+ {
+ condition_.await(waitTime, tu);
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ private void resumeStreaming()
+ {
+ /* if not in streaming mode do nothing */
+ if ( !bStream_ )
+ return;
+
+ lock_.lock();
+ try
+ {
+ condition_.signal();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ public void close()
+ {
+ inUse_ = false;
+ if ( pool_.contains(this) )
+ pool_.decUsed();
+ }
+
+ public boolean isConnected()
+ {
+ return socketChannel_.isConnected();
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof TcpConnection) )
+ return false;
+
+ TcpConnection rhs = (TcpConnection)o;
+ if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+ return true;
+ else
+ return false;
+ }
+
+ public int hashCode()
+ {
+ return (localEp_ + ":" + remoteEp_).hashCode();
+ }
+
+ public String toString()
+ {
+ return socketChannel_.toString();
+ }
+
+ void closeSocket()
+ {
+ logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ cancel(key_);
+ pendingWrites_.clear();
+ }
+
+ void errorClose()
+ {
+ logger_.warn("Closing down connection " + socketChannel_);
+ pendingWrites_.clear();
+ cancel(key_);
+ pendingWrites_.clear();
+ if ( pool_ != null )
+ {
+ pool_.removeConnection(this);
+ }
+ }
+
+ private void cancel(SelectionKey key)
+ {
+ if ( key != null )
+ SelectorManager.getSelectorManager().cancel(key);
+ }
+
+ // called in the selector thread
+ public void connect(SelectionKey key)
+ {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+ try
+ {
+ if (socketChannel_.finishConnect())
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(key);
+ connected_.set(true);
+
+ // this will flush the pending
+ if (!pendingWrites_.isEmpty())
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ resumeStreaming();
+ }
+ else
+ {
+ logger_.warn("Closing connection because socket channel could not finishConnect.");;
+ errorClose();
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn("Encountered IOException on connection: " + socketChannel_);
+ logger_.warn( LogUtil.throwableToString(e) );
+ errorClose();
+ }
+ }
+
+ // called in the selector thread
+ public void write(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+ doPendingWrites();
+ /*
+ * This is executed only if we are in streaming mode.
+ * Idea is that we read a chunk of data from a source
+ * and wait to read the next from the source until we
+ * are siganlled to do so from here.
+ */
+ resumeStreaming();
+ }
+
+ void doPendingWrites()
+ {
+ try
+ {
+ while(!pendingWrites_.isEmpty())
+ {
+ ByteBuffer buffer = pendingWrites_.get(0);
+ socketChannel_.write(buffer);
+ if (buffer.remaining() > 0)
+ {
+ break;
+ }
+ pendingWrites_.remove(0);
+ }
+
+ }
+ catch(IOException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ // This is to fix the wierd Linux bug with NIO.
+ errorClose();
+ }
+ finally
+ {
+ synchronized(this)
+ {
+ if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+ {
+ SelectorManager.getSelectorManager().modifyKeyForWrite(key_);
+ }
+ }
+ }
+ }
+
+ // called in the selector thread
+ public void read(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+ // publish this event onto to the TCPReadEvent Queue.
+ MessagingService.getReadExecutor().execute(readWork_);
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+ }
+
+ public void modifyKeyForWrite(SelectionKey key)
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
+ }
+
+ class ReadWorkItem implements Runnable
+ {
+ // called from the TCP READ thread pool
+ public void run()
+ {
+ if ( tcpReader_ == null )
+ {
+ tcpReader_ = new TcpReader(TcpConnection.this);
+ StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolState(tcpReader_);
+ tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+ }
+ tcpReader_.morphState(nextState);
+ }
+
+ try
+ {
+ byte[] bytes = new byte[0];
+ while ( (bytes = tcpReader_.read()).length > 0 )
+ {
+ ProtocolHeader pH = tcpReader_.getProtocolHeader();
+ if ( !pH.isStreamingMode_ )
+ {
+ /* first message received */
+ if (remoteEp_ == null)
+ {
+ int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.randomPort_;
+ remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostName(), port );
+ // put connection into pool if possible
+ pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);
+ pool_.addToPool(TcpConnection.this);
+ }
+
+ /* Deserialize and handle the message */
+ MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );
+ tcpReader_.resetState();
+ }
+ else
+ {
+ MessagingService.setStreamingMode(false);
+ /* Close this socket connection used for streaming */
+ closeSocket();
+ }
+ }
+ }
+ catch ( IOException ex )
+ {
+ handleException(ex);
+ }
+ catch ( Throwable th )
+ {
+ handleException(th);
+ }
+ finally
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(key_);
+ }
+ }
+
+ private void handleException(Throwable th)
+ {
+ logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+ logger_.warn(LogUtil.throwableToString(th));
+ // This is to fix the wierd Linux bug with NIO.
+ errorClose();
+ }
+ }
+
+ public int pending()
+ {
+ return pendingWrites_.size();
+ }
+
+ public int compareTo(Object o)
+ {
+ if (o instanceof TcpConnection)
+ {
+ return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();
+ }
+
+ throw new IllegalArgumentException();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+ EndPoint localEp_;
+
+ public TcpConnectionHandler(EndPoint localEp)
+ {
+ localEp_ = localEp;
+ }
+
+ public void accept(SelectionKey key)
+ {
+ try
+ {
+ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+ SocketChannel client = serverChannel.accept();
+
+ if ( client != null )
+ {
+ //new TcpConnection(client, localEp_, true);
+ TcpConnection.acceptConnection(client, localEp_, true);
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+ private Lock lock_ = new ReentrantLock();
+ private List<TcpConnection> allConnections_;
+ private EndPoint localEp_;
+ private EndPoint remoteEp_;
+ private int initialSize_;
+ private int growthFactor_;
+ private int maxSize_;
+ private long lastTimeUsed_;
+ private boolean isShut_;
+
+ private int inUse_;
+
+ TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+ {
+ initialSize_ = initialSize;
+ growthFactor_ = growthFactor;
+ maxSize_ = maxSize;
+ localEp_ = localEp;
+ remoteEp_ = remoteEp;
+ isShut_ = false;
+ lastTimeUsed_ = System.currentTimeMillis();
+ allConnections_ = new Vector<TcpConnection>();
+ }
+
+ TcpConnection getConnection() throws IOException
+ {
+ lock_.lock();
+ try
+ {
+ if (allConnections_.isEmpty())
+ {
+ TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+ addToPool(conn);
+ conn.inUse_ = true;
+ incUsed();
+ return conn;
+ }
+
+ TcpConnection least = getLeastLoaded();
+
+ if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+ least.inUse_ = true;
+ incUsed();
+ return least;
+ }
+
+ TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+ if ( connection != null && !contains(connection) )
+ {
+ addToPool(connection);
+ connection.inUse_ = true;
+ incUsed();
+ return connection;
+ }
+ else
+ {
+ if ( connection != null )
+ {
+ connection.closeSocket();
+ }
+ return getLeastLoaded();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ protected TcpConnection getLeastLoaded()
+ {
+ TcpConnection connection = null;
+ lock_.lock();
+ try
+ {
+ Collections.sort(allConnections_);
+ connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return connection;
+ }
+
+ void removeConnection(TcpConnection connection)
+ {
+ allConnections_.remove(connection);
+ }
+
+ void incUsed()
+ {
+ inUse_++;
+ }
+
+ void decUsed()
+ {
+ inUse_--;
+ }
+
+ int getConnectionsInUse()
+ {
+ return inUse_;
+ }
+
+ void addToPool(TcpConnection connection)
+ {
+
+ if ( contains(connection) )
+ return;
+
+ lock_.lock();
+ try
+ {
+ if ( allConnections_.size() < maxSize_ )
+ {
+ allConnections_.add(connection);
+ }
+ else
+ {
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ void shutdown()
+ {
+ lock_.lock();
+ try
+ {
+ while ( allConnections_.size() > 0 )
+ {
+ TcpConnection connection = allConnections_.remove(0);
+ connection.closeSocket();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ isShut_ = true;
+ }
+
+ int getPoolSize()
+ {
+ return allConnections_.size();
+ }
+
+ EndPoint getLocalEndPoint()
+ {
+ return localEp_;
+ }
+
+ EndPoint getRemoteEndPoint()
+ {
+ return remoteEp_;
+ }
+
+ int getPendingWrites()
+ {
+ int total = 0;
+ lock_.lock();
+ try
+ {
+ for ( TcpConnection connection : allConnections_ )
+ {
+ total += connection.pending();
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ return total;
+ }
+
+ boolean contains(TcpConnection connection)
+ {
+ return allConnections_.contains(connection);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+ private static final int BUFFER_SIZE = 4096;
+ private static final int protocol_ = 0xBADBEEF;
+
+ private DatagramChannel socketChannel_;
+ private SelectionKey key_;
+ private EndPoint localEndPoint_;
+
+ public void init() throws IOException
+ {
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ }
+
+ public void init(int port) throws IOException
+ {
+ // TODO: get TCP port from config and add one.
+ localEndPoint_ = new EndPoint(port);
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ public boolean write(Message message, EndPoint to) throws IOException
+ {
+ boolean bVal = true;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ Message.serializer().serialize(message, dos);
+ byte[] data = bos.toByteArray();
+ if ( data.length > 0 )
+ {
+ logger_.debug("Size of Gossip packet " + data.length);
+ byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+ ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+ buffer.put( protocol );
+ buffer.put(data);
+ buffer.flip();
+
+ int n = socketChannel_.send(buffer, to.getInetAddress());
+ if ( n == 0 )
+ {
+ bVal = false;
+ }
+ }
+ return bVal;
+ }
+
+ void close()
+ {
+ try
+ {
+ if ( socketChannel_ != null )
+ socketChannel_.close();
+ }
+ catch ( IOException ex )
+ {
+ logger_.error( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ public DatagramChannel getDatagramChannel()
+ {
+ return socketChannel_;
+ }
+
+ private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+ {
+ byte[] body = new byte[0];
+ byte[] protocol = new byte[4];
+ buffer = buffer.get(protocol, 0, protocol.length);
+ int value = BasicUtilities.byteArrayToInt(protocol);
+
+ if ( protocol_ != value )
+ {
+ logger_.info("Invalid protocol header in the incoming message " + value);
+ return body;
+ }
+ body = new byte[buffer.remaining()];
+ buffer.get(body, 0, body.length);
+ return body;
+ }
+
+ public void read(SelectionKey key)
+ {
+ key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ try
+ {
+ SocketAddress sa = socketChannel_.receive(buffer);
+ if ( sa == null )
+ {
+ logger_.debug("*** No datagram packet was available to be read ***");
+ return;
+ }
+ buffer.flip();
+
+ byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+ if ( bytes.length > 0 )
+ {
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ Message message = Message.serializer().deserialize(dis);
+ if ( message != null )
+ {
+ MessagingService.receive(message);
+ }
+ }
+ }
+ catch ( IOException ioe )
+ {
+ logger_.warn(LogUtil.throwableToString(ioe));
+ }
+ finally
+ {
+ key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,94 @@
+package org.apache.cassandra.net.http;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+
+public class ColumnFamilyFormatter extends HTMLFormatter
+{
+ public ColumnFamilyFormatter()
+ {
+ super();
+ }
+
+ public ColumnFamilyFormatter(StringBuilder sb)
+ {
+ super(sb);
+ }
+
+ public void printKeyColumnFamily(StringBuilder sb, String sKey, ColumnFamily cf)
+ {
+ // print the key
+ sb.append("Key = " + sKey + "<br>");
+
+ // print the column familie
+ printColumnFamily(sb, cf);
+ }
+
+ public void printColumnFamily(StringBuilder sb, ColumnFamily cf)
+ {
+ // first print the column family specific data
+ sb.append("ColumnFamily = " + cf.name() + "<br>");
+
+ String columnFamilyType = DatabaseDescriptor.getColumnType(cf.name());
+ Collection<IColumn> cols = cf.getAllColumns();
+ if("Super".equals(columnFamilyType))
+ {
+ printSuperColumns(sb, cols);
+ }
+ else
+ {
+ printSimpleColumns(sb, cols);
+ }
+ }
+
+ public void printSuperColumns(StringBuilder sb, Collection<IColumn> cols)
+ {
+ // print the super column summary
+ sb.append("Number of super columns = " + cols.size() + "<br>");
+
+ startTable();
+ for(IColumn col : cols)
+ {
+ addHeader(col.name());
+ startRow();
+ Collection<IColumn> simpleCols = ((SuperColumn)col).getSubColumns();
+ printSimpleColumns(sb, simpleCols);
+ endRow();
+ }
+ endTable();
+ }
+
+ public void printSimpleColumns(StringBuilder sb, Collection<IColumn> cols)
+ {
+ int numColumns = cols.size();
+ String[] columnNames = new String[numColumns];
+ String[] columnValues = new String[numColumns];
+
+ // print the simple column summary
+ //sb.append("Number of simple columns = " + cols.size() + "<br>");
+
+ int i = 0;
+ for(IColumn col : cols)
+ {
+ columnNames[i] = col.name();
+ columnValues[i] = new String(col.value());
+ ++i;
+ }
+
+ startTable();
+ addHeaders(columnNames);
+ startRow();
+ for(i = 0; i < numColumns; ++i)
+ {
+ addCol(columnValues[i]);
+ }
+ endRow();
+ endTable();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Helper function to write some basic HTML.
+ */
+
+package org.apache.cassandra.net.http;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HTMLFormatter
+{
+ protected StringBuilder sb_ = null;
+ private boolean writeBody_;
+
+ public HTMLFormatter()
+ {
+ sb_ = new StringBuilder();
+ }
+
+ public HTMLFormatter(StringBuilder sb)
+ {
+ sb_ = sb;
+ }
+
+ public void startBody()
+ {
+ startBody(false, "", true, true);
+ }
+
+ public void startBody(boolean writeJSCallback, String jsCallbackFunction, boolean writeCSS, boolean writeBody)
+ {
+ writeBody_ = writeBody;
+
+ sb_.append("<html>\n");
+ if(writeCSS || writeJSCallback)
+ {
+ sb_.append("<head>\n");
+ if(writeJSCallback)
+ addJSCallback(jsCallbackFunction);
+ if(writeCSS)
+ addCSS();
+ sb_.append("</head>\n");
+ }
+
+ if(writeBody)
+ {
+ sb_.append("<body bgcolor=black>\n");
+ }
+ }
+
+ public void endBody()
+ {
+ if(writeBody_)
+ {
+ sb_.append("</body>\n");
+ }
+ sb_.append("</html>\n");
+ }
+
+ public void appendLine(String s)
+ {
+ sb_.append(s);
+ sb_.append("<br>\n");
+ }
+
+ public void append(String s)
+ {
+ sb_.append(s);
+ }
+
+ public void addJScript(String jscript)
+ {
+ append("<script language=\"text/javascript\">\n");
+ append(jscript + "\n");
+ append("</script>\n");
+ }
+
+ public void startTable()
+ {
+ sb_.append("<table>\n");
+ }
+
+ public void addHeaders(String[] sTableHeaders)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\" >\n");
+ for (int i = 0; i < sTableHeaders.length; ++i)
+ {
+ sb_.append("<th><div class=\"tmenubar\">");
+ sb_.append("<b>" + sTableHeaders[i] + "</b>");
+ sb_.append("</div></th>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+ }
+
+ public void addHeader(String sTableHeader)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\" >\n");
+ sb_.append("<th><div class=\"tmenubar\">");
+ sb_.append("<b>" + sTableHeader + "</b>");
+ sb_.append("</div></th>\n");
+ sb_.append("\n</tr>\n\n");
+ }
+
+ public void startRow()
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ }
+
+ public void addCol(String sData)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append(sData);
+ sb_.append("</td>");
+ }
+
+ public void endRow()
+ {
+ sb_.append("</tr>\n");
+ }
+
+ public void endTable()
+ {
+ sb_.append("</table>\n");
+ }
+
+ public void addCombobox(Set<String> comboBoxEntries, String htmlElementName)
+ {
+ addCombobox(comboBoxEntries, htmlElementName, -1);
+ }
+
+ public void addCombobox(Set<String> comboBoxEntries, String htmlElementName, int defaultSelected)
+ {
+ sb_.append(" <select name=" + htmlElementName + " size=1>\n");
+ if(defaultSelected == -1)
+ {
+ sb_.append(" <option value=\"\" SELECTED>Select an option \n");
+ }
+
+ int i = 0;
+ for(String colFamName : comboBoxEntries)
+ {
+ if(defaultSelected == i)
+ {
+ sb_.append(" <option value=\"" + colFamName + "\" SELECTED>" + colFamName + "\n");
+ }
+ else
+ {
+ sb_.append(" <option value=\"" + colFamName + "\">" + colFamName + "\n");
+ }
+ }
+ sb_.append(" </select>\n");
+ }
+
+ public void addDivElement(String divId, String value)
+ {
+ sb_.append("<div id = \"" + divId + "\">");
+ if(value != null)
+ sb_.append(value);
+ sb_.append("</div>\n");
+ }
+
+ public void createTable(String[] sTableHeaders, String[][] sTable)
+ {
+ if (sTable == null || sTable.length == 0)
+ return;
+
+ sb_.append("<table style=\"border: 2px solid #333333\">\n");
+
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ for (int i = 0; i < sTableHeaders.length; ++i)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append("<b>" + sTableHeaders[i] + "</b>");
+ sb_.append("</td>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+
+ for (int i = 0; i < sTable.length; ++i)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ for (int j = 0; j < sTable[i].length; ++j)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append(sTable[i][j]);
+ sb_.append("</td>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+ }
+ sb_.append("</table>\n");
+ }
+
+ public void addJSCallback(String jsCallbackFunction)
+ {
+ sb_.append("<script type=\"text/javascript\">\n");
+
+ addJSForTabs();
+
+ sb_.append(jsCallbackFunction +"\n");
+ sb_.append("</script>\n");
+ }
+
+ public void addCSS()
+ {
+ sb_.append("<style type=\"text/css\">\n");
+ sb_.append("body\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append(" font-family:Arial Unicode MS,Verdana, Arial, Sans-serif;\n");
+ sb_.append(" font-size:10pt;\n");
+ sb_.append("}\n");
+
+ sb_.append(".tmenubar\n");
+ sb_.append("{\n");
+ sb_.append(" background-color:green;\n");
+ sb_.append(" font-family:Verdana, Arial, Sans-serif;\n");
+ sb_.append(" font-size:10pt;\n");
+ sb_.append(" font-weight:bold;\n");
+ sb_.append("}\n");
+
+ sb_.append("th\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append("}\n");
+
+ sb_.append("td\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append("}\n");
+ sb_.append("a:link {color:#CAF99B;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana}\n");
+ sb_.append("a:visited {color:red}\n");
+ sb_.append("a:hover{color:yellow;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana;background-color:green}\n");
+
+ addCSSForTabs();
+
+ sb_.append("</style>\n");
+
+ }
+
+ public void addCSSForTabs()
+ {
+ sb_.append("#header ul {\n");
+ sb_.append(" list-style: none;\n");
+ sb_.append(" padding: 0;\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append(" }\n");
+ sb_.append("\n");
+ sb_.append("#header li {\n");
+ sb_.append(" float: left;\n");
+ sb_.append(" border: 1px solid #bbb;\n");
+ sb_.append(" border-bottom-width: 0;\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header a {\n");
+ sb_.append(" text-decoration: none;\n");
+ sb_.append(" display: block;\n");
+ sb_.append(" background: #eee;\n");
+ sb_.append(" padding: 0.24em 1em;\n");
+ sb_.append(" color: #00c;\n");
+ sb_.append(" width: 8em;\n");
+ sb_.append(" text-align: center;\n");
+ sb_.append(" }\n");
+ sb_.append("\n");
+ sb_.append("#header a:hover {\n");
+ sb_.append(" background: #ddf;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header #selected {\n");
+ sb_.append(" border-color: black;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header #selected a {\n");
+ sb_.append(" position: relative;\n");
+ sb_.append(" top: 1px;\n");
+ sb_.append(" background: white;\n");
+ sb_.append(" color: black;\n");
+ sb_.append(" font-weight: bold;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#content {\n");
+ sb_.append(" border: 1px solid black;\n");
+ sb_.append(" visibility:hidden;\n");
+ sb_.append(" position:absolute;\n");
+ sb_.append(" top:200;\n");
+ sb_.append(" clear: both;\n");
+ sb_.append(" padding: 0 1em;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("h1 {\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append(" padding: 0 0 1em 0;\n");
+ sb_.append("}\n");
+ }
+
+ public void addJSForTabs()
+ {
+ sb_.append("var curSelectedDivId = \"one\";\n");
+ sb_.append("\n");
+ sb_.append("function selectTab(tabDivId)\n");
+ sb_.append("{\n");
+ sb_.append(" var x = document.getElementsByName(curSelectedDivId);\n");
+ sb_.append(" if(x[1])\n");
+ sb_.append(" x[1].style.visibility=\"hidden\";\n");
+ sb_.append(" if(x[0])\n");
+ sb_.append(" x[0].id=curSelectedDivId;\n");
+ sb_.append("\n");
+ sb_.append("\n");
+ sb_.append(" var y = document.getElementsByName(tabDivId);\n");
+ sb_.append(" if(y[1])\n");
+ sb_.append(" y[1].style.visibility=\"visible\";\n");
+ sb_.append(" if(y[0])\n");
+ sb_.append(" y[0].id = \"selected\";\n");
+ sb_.append("\n");
+ sb_.append(" curSelectedDivId = tabDivId;\n");
+ sb_.append("}\n");
+ }
+
+ public String toString()
+ {
+ return sb_.toString();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class accepts a client connection and parses http data from it.
+ */
+
+// TODO: shouldClose_ is not used correctly. It should be used to close the socket? When?
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.net.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpConnection extends SelectionKeyHandler implements HttpStartLineParser.Callback, HttpHeaderParser.Callback
+{
+ private static Logger logger_ = Logger.getLogger(StorageService.class);
+ public static final String httpRequestVerbHandler_ = "HTTP-REQUEST-VERB-HANDLER";
+ public static final String httpStage_ = "HTTP-STAGE";
+
+ /*
+ * These are the callbacks into who ever intends
+ * to listen on the client socket.
+ */
+ public interface HttpConnectionListener
+ {
+ public void onRequest(HttpRequest httpRequest);
+ public void onResponse(HttpResponse httpResponse);
+ }
+
+ enum HttpMessageType
+ {
+ UNKNOWN,
+ REQUEST,
+ RESPONSE
+ }
+
+ enum ParseState
+ {
+ IN_NEW,
+ IN_START,
+ IN_HEADERS, IN_BODY
+ }
+
+ private ParseState parseState_ = ParseState.IN_NEW;
+ private long parseStartTime_ = 0;
+ private HttpMessageType currentMsgType_ = HttpMessageType.UNKNOWN;
+ private int contentLength_ = 0;
+ private List<ByteBuffer> bodyBuffers_ = new LinkedList<ByteBuffer>();
+ private boolean shouldClose_ = false;
+ private String defaultContentType_ = "text/html";
+ private HttpRequest currentRequest_ = null;
+ private HttpResponse currentResponse_ = null;
+ private HttpStartLineParser startLineParser_ = new HttpStartLineParser(this);
+ private HttpHeaderParser headerParser_ = new HttpHeaderParser(this);
+ /* Selection Key associated with this HTTP Connection */
+ private SelectionKey httpKey_;
+ /* SocketChannel associated with this HTTP Connection */
+ private SocketChannel httpChannel_;
+ /* HTTPReader instance associated with this HTTP Connection */
+ private HTTPReader httpReader_ = new HTTPReader();
+
+ /*
+ * This abstraction starts reading the data that comes in
+ * on a HTTP request. It accumulates the bytes read into
+ * a buffer and passes the buffer to the HTTP parser.
+ */
+
+ class HTTPReader implements Runnable
+ {
+ /* We read 256 bytes at a time from a HTTP connection */
+ private static final int bufferSize_ = 256;
+
+ /*
+ * Read buffers from the input stream into the byte buffer.
+ */
+ public void run()
+ {
+ ByteBuffer readBuffer = ByteBuffer.allocate(HTTPReader.bufferSize_);
+ try
+ {
+ int bytesRead = httpChannel_.read(readBuffer);
+ readBuffer.flip();
+ if ( readBuffer.remaining() > 0 )
+ HttpConnection.this.parse(readBuffer);
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+
+ public static class HttpRequestMessage
+ {
+ private HttpRequest httpRequest_;
+ private HttpConnection httpConnection_;
+
+ HttpRequestMessage(HttpRequest httpRequest, HttpConnection httpConnection)
+ {
+ httpRequest_ = httpRequest;
+ httpConnection_ = httpConnection;
+ }
+
+ public HttpRequest getHttpRequest()
+ {
+ return httpRequest_;
+ }
+
+ public HttpConnection getHttpConnection()
+ {
+ return httpConnection_;
+ }
+ }
+
+ /*
+ * Read called on the Selector thread. This is called
+ * when there is some HTTP request that needs to be
+ * processed.
+ */
+ public void read(SelectionKey key)
+ {
+ if ( httpKey_ == null )
+ {
+ httpKey_ = key;
+ httpChannel_ = (SocketChannel)key.channel();
+ }
+ /* deregister interest for read */
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+ /* Add a task to process the HTTP request */
+ MessagingService.getReadExecutor().execute(httpReader_);
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
+ }
+
+ private void resetParserState()
+ {
+ startLineParser_.resetParserState();
+ headerParser_.resetParserState();
+ parseState_ = ParseState.IN_NEW;
+ contentLength_ = 0;
+ bodyBuffers_ = new LinkedList<ByteBuffer>();
+ currentMsgType_ = HttpMessageType.UNKNOWN;
+ currentRequest_ = null;
+ currentResponse_ = null;
+ }
+
+ public void close()
+ {
+ logger_.info("Closing HTTP socket ...");
+ if ( httpKey_ != null )
+ SelectorManager.getSelectorManager().cancel(httpKey_);
+ }
+
+ /*
+ * Process the HTTP commands sent from the client. Reads
+ * the socket and parses the HTTP request.
+ */
+ public void parse(ByteBuffer bb)
+ {
+ try
+ {
+ logger_.debug("Processing http requests from socket ...");
+ switch (parseState_)
+ {
+ case IN_NEW:
+ parseState_ = ParseState.IN_START;
+ parseStartTime_ = System.currentTimeMillis();
+
+ // fall through
+ case IN_START:
+ if (startLineParser_.onMoreBytesNew(bb) == false)
+ {
+ break;
+ }
+ else
+ {
+ /* Already done through the callback */
+ parseState_ = ParseState.IN_HEADERS;
+ }
+
+ // fall through
+ case IN_HEADERS:
+ if (headerParser_.onMoreBytesNew(bb) == false)
+ {
+
+ break; // need more bytes
+ }
+ else
+ {
+ String len;
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ len = currentRequest_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+ // find if we should close method
+ if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.1"))
+ {
+ /*
+ * Scan all of the headers for close messages
+ */
+ String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+ if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.CLOSE))
+ {
+ shouldClose_ = true;
+ }
+ } else if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.0"))
+ {
+ /* By default no keep-alive */
+ shouldClose_ = true;
+
+ /*
+ * Scan all of the headers for keep-alive
+ * messages
+ */
+ String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+ if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.KEEP_ALIVE))
+ {
+ shouldClose_ = false;
+ }
+ } else
+ {
+ /* Assume 0.9 */
+ shouldClose_ = true;
+ }
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ len = currentResponse_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+ // TODO: pay attention to keep-alive and
+ // close headers
+ }
+ else
+ {
+ logger_.warn("in HttpConnection::processInput_() Message type is not set");
+ return;
+ }
+
+ if (len != null)
+ {
+ try
+ {
+ if(len == null || len.equals(""))
+ contentLength_ = 0;
+ else
+ contentLength_ = Integer.parseInt(len);
+ }
+ catch (NumberFormatException ex)
+ {
+ throw new HttpParsingException();
+ }
+ }
+ parseState_ = ParseState.IN_BODY;
+ }
+
+ // fall through
+ case IN_BODY:
+ boolean done = false;
+
+ if (contentLength_ > 0)
+ {
+ if (bb.remaining() > contentLength_)
+ {
+ int newLimit = bb.position() + contentLength_;
+ bodyBuffers_.add(((ByteBuffer) bb.duplicate().limit(newLimit)).slice());
+ bb.position(newLimit);
+ contentLength_ = 0;
+ }
+ else
+ {
+ contentLength_ -= bb.remaining();
+ bodyBuffers_.add(bb.duplicate());
+ bb.position(bb.limit());
+ }
+ }
+
+ if (contentLength_ == 0)
+ {
+ done = true;
+ }
+
+ if (done)
+ {
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ //currentRequest_.setParseTime(env_.getCurrentTime() - parseStartTime_);
+ currentRequest_.setBody(bodyBuffers_);
+
+ if (currentRequest_.getHeader("Content-Type") == null)
+ {
+ currentRequest_.addHeader("Content-Type", defaultContentType_);
+ }
+
+ handleRequest(currentRequest_);
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ logger_.info("Holy shit! We are not supposed to be here - ever !!!");
+ }
+ else
+ {
+ logger_.error("Http message type is still" +
+ " unset after we finish parsing the body?");
+ }
+
+ resetParserState();
+ }
+ }
+
+ }
+ catch (final Throwable e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ //close();
+ }
+ finally
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+ }
+ }
+
+ public void write(ByteBuffer buffer)
+ {
+ /*
+ * TODO: Make this a non blocking write.
+ */
+ try
+ {
+ while ( buffer.remaining() > 0 )
+ {
+ httpChannel_.write(buffer);
+ }
+ close();
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+
+ private void handleRequest(HttpRequest request)
+ {
+ HttpConnection.HttpRequestMessage httpRequestMessage = new HttpConnection.HttpRequestMessage(request, this);
+ Message httpMessage = new Message(null, HttpConnection.httpStage_, HttpConnection.httpRequestVerbHandler_, new Object[]{httpRequestMessage});
+ MessagingService.receive(httpMessage);
+ }
+
+ // HttpStartLineParser.Callback interface implementation
+ public void onStartLine(String method, String path, String query, String version)
+ {
+ logger_.debug("Startline method=" + method + " path=" + path + " query=" + query + " version=" + version);
+
+ if (method.startsWith("HTTP"))
+ {
+ // response
+ currentMsgType_ = HttpMessageType.RESPONSE;
+ currentResponse_ = new HttpResponse();
+ currentResponse_.setStartLine(method, path, version);
+ }
+ else
+ {
+ // request
+ currentMsgType_ = HttpMessageType.REQUEST;
+ currentRequest_ = new HttpRequest();
+ currentRequest_.setStartLine(method, path, query, version);
+ }
+ }
+
+ // HttpHeaderParser.Callback interface implementation
+ public void onHeader(String name, String value)
+ {
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ currentRequest_.addHeader(name, value);
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ currentResponse_.addHeader(name, value);
+ }
+ else
+ {
+ logger_.warn("Unknown message type -- HttpConnection::onHeader()");
+ }
+
+ logger_.debug(name + " : " + value);
+ }
+}
+
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.log4j.Logger;
+
+public class HttpConnectionHandler extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(HttpConnectionHandler.class);
+
+ public void accept(SelectionKey key)
+ {
+ try
+ {
+ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+ SocketChannel client = serverChannel.accept();
+ if ( client != null )
+ {
+ client.configureBlocking(false);
+ SelectionKeyHandler handler = new HttpConnection();
+ SelectorManager.getSelectorManager().register(client, handler, SelectionKey.OP_READ);
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn(e);
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+/**
+ * A parser for HTTP header lines.
+ *
+ */
+public class HttpHeaderParser
+{
+
+ private Callback callback_;
+
+ public interface Callback
+ {
+
+ public void onHeader(String key, String value);
+ }
+
+ public HttpHeaderParser(Callback cb)
+ {
+ callback_ = cb;
+ }
+
+ enum HeaderParseState
+ {
+ // we are at the very beginning of the line
+ START_OF_HEADER_LINE,
+ // are at line beginning, read '\r' but ran out of bytes in this round
+ START_OF_HEADER_LINE_WITH_READ_SLASH_R,
+ // we are in the process of parsing a header key
+ IN_HEADER_KEY,
+ // eat whitespace after the ':' but before the value
+ PRE_HEADER_VALUE_WHITESPACE,
+ // we are in the process of parsing a header value
+ IN_HEADER_VALUE,
+ // were in IN_HEADER_VALUE and read '\r' but ran out of more bytes
+ IN_HEADER_VALUE_WITH_READ_SLASH_R,
+ /*
+ * got \r\n in the header value. now consider whether its a multilined
+ * value. For example,
+ *
+ * HeaderKey: HeaderValue\r\n this is still part of the value\r\n
+ *
+ * is a valid HTTP header line with value
+ *
+ * HeaderValue\r\n this is still part of the value
+ *
+ * NOTE: while all whitespace should generally be condensed into a
+ * single space by the HTTP standard, we will just preserve all of the
+ * whitespace for now
+ *
+ * TODO: consider replacing all whitespace with a single space
+ *
+ * TODO: this parser doesn't correctly preserve the \r\n, should it?
+ */
+ CHECKING_END_OF_VALUE,
+ // we are just about to reset the state of the header parser
+ TO_RESET
+ }
+
+ // the current state of the parser
+ private HeaderParseState parseState_ = HeaderParseState.TO_RESET;
+ // incrementally build up this HTTP header key as we read it
+ private StringBuilder headerKey_ = new StringBuilder(32);
+
+ // incrementally build up this HTTP header value as we read it
+ private StringBuilder headerValue_ = new StringBuilder(64);
+
+ public void resetParserState()
+ {
+ headerKey_.setLength(0);
+ headerValue_.setLength(0);
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ }
+
+ private void finishCurrentHeader_()
+ {
+ if (callback_ != null)
+ {
+ callback_.onHeader(headerKey_.toString().trim(), headerValue_
+ .toString().trim());
+ }
+ resetParserState();
+ }
+
+ public boolean onMoreBytes(InputStream in) throws IOException
+ {
+ int got;
+
+ if (parseState_ == HeaderParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ switch (parseState_)
+ {
+
+ case START_OF_HEADER_LINE:
+ switch (got)
+ {
+ case '\r':
+ if (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ if (got == '\n')
+ {
+ parseState_ = HeaderParseState.TO_RESET;
+ return true;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ in.reset();
+ }
+ } // wait for more data to make this decision
+ else
+ {
+ in.reset();
+ return false;
+ }
+ break;
+
+ default:
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ break;
+ }
+ break;
+
+ case IN_HEADER_KEY:
+ switch (got)
+ {
+ case ':':
+ parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+ break;
+ // TODO: find out: whether to eat whitespace before a :
+ default:
+ headerKey_.append((char) got);
+ break;
+ }
+ break;
+
+ case PRE_HEADER_VALUE_WHITESPACE:
+ switch (got)
+ {
+ case ' ':
+ case '\t':
+ break;
+ default:
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+ }
+ break;
+
+ case IN_HEADER_VALUE:
+ switch (got)
+ {
+ case '\r':
+ if (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ if (got == '\n')
+ {
+ parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+ break;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ in.reset();
+ }
+ }
+ else
+ {
+ in.reset();
+ return false;
+ }
+ break;
+ default:
+ headerValue_.append((char) got);
+ break;
+ }
+ break;
+
+ case CHECKING_END_OF_VALUE:
+ switch (got)
+ {
+ case ' ':
+ case '\t':
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+ default:
+ in.reset();
+ finishCurrentHeader_();
+ }
+ break;
+ default:
+ assert false;
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ break;
+ }
+ }
+
+ return false;
+ }
+
+ public boolean onMoreBytesNew(ByteBuffer buffer) throws IOException
+ {
+
+ int got;
+ int limit = buffer.limit();
+ int pos = buffer.position();
+
+ if (parseState_ == HeaderParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (pos < limit)
+ {
+ switch (parseState_)
+ {
+
+ case START_OF_HEADER_LINE:
+ if ((got = buffer.get(pos)) != '\r')
+ {
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ break;
+ }
+ else
+ {
+ pos++;
+ if (pos == limit) // Need more bytes
+ {
+ buffer.position(pos);
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE_WITH_READ_SLASH_R;
+ return false;
+ }
+ }
+ // fall through
+
+ case START_OF_HEADER_LINE_WITH_READ_SLASH_R:
+ // Processed "...\r\n\r\n" - headers are complete
+ if (((char) buffer.get(pos)) == '\n')
+ {
+ buffer.position(++pos);
+ parseState_ = HeaderParseState.TO_RESET;
+ return true;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ }
+ //fall through
+
+ case IN_HEADER_KEY:
+ // TODO: find out: whether to eat whitespace before a :
+ while (pos < limit && (got = buffer.get(pos)) != ':')
+ {
+ headerKey_.append((char) got);
+ pos++;
+ }
+ if (pos < limit)
+ {
+ pos++; //eating ':'
+ parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+ }
+ break;
+
+ case PRE_HEADER_VALUE_WHITESPACE:
+ while ((((got = buffer.get(pos)) == ' ') || (got == '\t'))
+ && (++pos < limit))
+ {
+ ;
+ }
+ if (pos < limit)
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ }
+ break;
+
+ case IN_HEADER_VALUE:
+ while (pos < limit && (got = buffer.get(pos)) != '\r')
+ {
+ headerValue_.append((char) got);
+ pos++;
+ }
+ if (pos == limit)
+ {
+ break;
+ }
+
+ pos++;
+ if (pos == limit)
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE_WITH_READ_SLASH_R;
+ break;
+ //buffer.position(pos);
+ //return false;
+ }
+ // fall through
+
+ case IN_HEADER_VALUE_WITH_READ_SLASH_R:
+ if (((char) buffer.get(pos)) == '\n')
+ {
+ parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+ pos++;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ }
+ break;
+
+ case CHECKING_END_OF_VALUE:
+ switch ((char) buffer.get(pos))
+ {
+ case ' ':
+ case '\t':
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+
+ default:
+ // Processed "headerKey headerValue\r\n"
+ finishCurrentHeader_();
+ }
+ break;
+
+ default:
+ assert false;
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ break;
+ }
+
+ }
+ // Need to read more bytes - get next buffer
+ buffer.position(pos);
+ return false;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author kranganathan
+ */
+
+public class HttpParsingException extends IOException
+{
+ private static final long serialVersionUID = 1L;
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+/**
+ *
+ * @author kranganathan
+ */
+public interface HttpProtocolConstants
+{
+ static final String CONNECTION = "Connection";
+ static final String CONTENT_LENGTH = "Content-Length";
+ static final String CLOSE = "close";
+ static final String KEEP_ALIVE = "Keep-Alive";
+}