You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [22/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java Thu Jul 30 15:30:21 2009
@@ -1,168 +1,168 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.net.SocketAddress;
-import java.nio.*;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.BasicUtilities;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class UdpConnection extends SelectionKeyHandler
-{
- private static Logger logger_ = Logger.getLogger(UdpConnection.class);
- private static final int BUFFER_SIZE = 4096;
- private static final int protocol_ = 0xBADBEEF;
-
- private DatagramChannel socketChannel_;
- private SelectionKey key_;
- private EndPoint localEndPoint_;
-
- public void init() throws IOException
- {
- socketChannel_ = DatagramChannel.open();
- socketChannel_.socket().setReuseAddress(true);
- socketChannel_.configureBlocking(false);
- }
-
- public void init(int port) throws IOException
- {
- // TODO: get TCP port from config and add one.
- localEndPoint_ = new EndPoint(port);
- socketChannel_ = DatagramChannel.open();
- socketChannel_.socket().bind(localEndPoint_.getInetAddress());
- socketChannel_.socket().setReuseAddress(true);
- socketChannel_.configureBlocking(false);
- key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
- }
-
- public boolean write(Message message, EndPoint to) throws IOException
- {
- boolean bVal = true;
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- Message.serializer().serialize(message, dos);
- byte[] data = bos.toByteArray();
- if ( data.length > 0 )
- {
- if (logger_.isTraceEnabled())
- logger_.trace("Size of Gossip packet " + data.length);
- byte[] protocol = BasicUtilities.intToByteArray(protocol_);
- ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
- buffer.put( protocol );
- buffer.put(data);
- buffer.flip();
-
- int n = socketChannel_.send(buffer, to.getInetAddress());
- if ( n == 0 )
- {
- bVal = false;
- }
- }
- return bVal;
- }
-
- void close()
- {
- try
- {
- if ( socketChannel_ != null )
- socketChannel_.close();
- }
- catch ( IOException ex )
- {
- logger_.error( LogUtil.throwableToString(ex) );
- }
- }
-
- public DatagramChannel getDatagramChannel()
- {
- return socketChannel_;
- }
-
- private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
- {
- byte[] body = new byte[0];
- byte[] protocol = new byte[4];
- buffer = buffer.get(protocol, 0, protocol.length);
- int value = BasicUtilities.byteArrayToInt(protocol);
-
- if ( protocol_ != value )
- {
- logger_.info("Invalid protocol header in the incoming message " + value);
- return body;
- }
- body = new byte[buffer.remaining()];
- buffer.get(body, 0, body.length);
- return body;
- }
-
- public void read(SelectionKey key)
- {
- turnOffInterestOps(key, SelectionKey.OP_READ);
- ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
- try
- {
- SocketAddress sa = socketChannel_.receive(buffer);
- if ( sa == null )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("*** No datagram packet was available to be read ***");
- return;
- }
- buffer.flip();
-
- byte[] bytes = gobbleHeaderAndExtractBody(buffer);
- if ( bytes.length > 0 )
- {
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- Message message = Message.serializer().deserialize(dis);
- if ( message != null )
- {
- MessagingService.receive(message);
- }
- }
- }
- catch ( IOException ioe )
- {
- logger_.warn(LogUtil.throwableToString(ioe));
- }
- finally
- {
- turnOnInterestOps(key_, SelectionKey.OP_READ );
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+ private static final int BUFFER_SIZE = 4096;
+ private static final int protocol_ = 0xBADBEEF;
+
+ private DatagramChannel socketChannel_;
+ private SelectionKey key_;
+ private EndPoint localEndPoint_;
+
+ public void init() throws IOException
+ {
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ }
+
+ public void init(int port) throws IOException
+ {
+ // TODO: get TCP port from config and add one.
+ localEndPoint_ = new EndPoint(port);
+ socketChannel_ = DatagramChannel.open();
+ socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+ socketChannel_.socket().setReuseAddress(true);
+ socketChannel_.configureBlocking(false);
+ key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+ }
+
+ public boolean write(Message message, EndPoint to) throws IOException
+ {
+ boolean bVal = true;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ Message.serializer().serialize(message, dos);
+ byte[] data = bos.toByteArray();
+ if ( data.length > 0 )
+ {
+ if (logger_.isTraceEnabled())
+ logger_.trace("Size of Gossip packet " + data.length);
+ byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+ ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+ buffer.put( protocol );
+ buffer.put(data);
+ buffer.flip();
+
+ int n = socketChannel_.send(buffer, to.getInetAddress());
+ if ( n == 0 )
+ {
+ bVal = false;
+ }
+ }
+ return bVal;
+ }
+
+ void close()
+ {
+ try
+ {
+ if ( socketChannel_ != null )
+ socketChannel_.close();
+ }
+ catch ( IOException ex )
+ {
+ logger_.error( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ public DatagramChannel getDatagramChannel()
+ {
+ return socketChannel_;
+ }
+
+ private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+ {
+ byte[] body = new byte[0];
+ byte[] protocol = new byte[4];
+ buffer = buffer.get(protocol, 0, protocol.length);
+ int value = BasicUtilities.byteArrayToInt(protocol);
+
+ if ( protocol_ != value )
+ {
+ logger_.info("Invalid protocol header in the incoming message " + value);
+ return body;
+ }
+ body = new byte[buffer.remaining()];
+ buffer.get(body, 0, body.length);
+ return body;
+ }
+
+ public void read(SelectionKey key)
+ {
+ turnOffInterestOps(key, SelectionKey.OP_READ);
+ ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+ try
+ {
+ SocketAddress sa = socketChannel_.receive(buffer);
+ if ( sa == null )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("*** No datagram packet was available to be read ***");
+ return;
+ }
+ buffer.flip();
+
+ byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+ if ( bytes.length > 0 )
+ {
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ Message message = Message.serializer().deserialize(dis);
+ if ( message != null )
+ {
+ MessagingService.receive(message);
+ }
+ }
+ }
+ catch ( IOException ioe )
+ {
+ logger_.warn(LogUtil.throwableToString(ioe));
+ }
+ finally
+ {
+ turnOnInterestOps(key_, SelectionKey.OP_READ );
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java Thu Jul 30 15:30:21 2009
@@ -1,67 +1,67 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-
-import org.apache.cassandra.utils.FBUtilities;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ContentLengthState extends StartState
-{
- private ByteBuffer buffer_;
-
- ContentLengthState(TcpReader stream)
- {
- super(stream);
- buffer_ = ByteBuffer.allocate(4);
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- return doRead(buffer_);
- }
-
- public void morphState() throws IOException
- {
- int size = FBUtilities.byteArrayToInt(buffer_.array());
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
- if ( nextState == null )
- {
- nextState = new ContentState(stream_, size);
- stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
- }
- else
- {
- nextState.setContextData(size);
- }
- stream_.morphState( nextState );
- buffer_.clear();
- }
-
- public void setContextData(Object data)
- {
- throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ ContentLengthState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ int size = FBUtilities.byteArrayToInt(buffer_.array());
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+ if ( nextState == null )
+ {
+ nextState = new ContentState(stream_, size);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+ }
+ else
+ {
+ nextState.setContextData(size);
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java Thu Jul 30 15:30:21 2009
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ContentState extends StartState
-{
- private ByteBuffer buffer_;
- private int length_;
-
- ContentState(TcpReader stream, int length)
- {
- super(stream);
- length_ = length;
- buffer_ = ByteBuffer.allocate(length_);
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- return doRead(buffer_);
- }
-
- public void morphState() throws IOException
- {
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
- if ( nextState == null )
- {
- nextState = new DoneState(stream_, toBytes());
- stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
- }
- else
- {
- nextState.setContextData(toBytes());
- }
- stream_.morphState( nextState );
- }
-
- private byte[] toBytes()
- {
- buffer_.position(0);
- /*
- ByteBuffer slice = buffer_.slice();
- return slice.array();
- */
- byte[] bytes = new byte[length_];
- buffer_.get(bytes, 0, length_);
- return bytes;
- }
-
- public void setContextData(Object data)
- {
- Integer value = (Integer)data;
- length_ = value;
- buffer_.clear();
- if ( buffer_.capacity() < length_ )
- buffer_ = ByteBuffer.allocate(length_);
- else
- {
- buffer_.limit(length_);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+ private ByteBuffer buffer_;
+ private int length_;
+
+ ContentState(TcpReader stream, int length)
+ {
+ super(stream);
+ length_ = length;
+ buffer_ = ByteBuffer.allocate(length_);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, toBytes());
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(toBytes());
+ }
+ stream_.morphState( nextState );
+ }
+
+ private byte[] toBytes()
+ {
+ buffer_.position(0);
+ /*
+ ByteBuffer slice = buffer_.slice();
+ return slice.array();
+ */
+ byte[] bytes = new byte[length_];
+ buffer_.get(bytes, 0, length_);
+ return bytes;
+ }
+
+ public void setContextData(Object data)
+ {
+ Integer value = (Integer)data;
+ length_ = value;
+ buffer_.clear();
+ if ( buffer_.capacity() < length_ )
+ buffer_ = ByteBuffer.allocate(length_);
+ else
+ {
+ buffer_.limit(length_);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java Thu Jul 30 15:30:21 2009
@@ -1,138 +1,138 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.io.*;
-
-import org.apache.cassandra.db.Table;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-
-class ContentStreamState extends StartState
-{
- private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
- private static long count_ = 64*1024*1024;
- /* Return this byte array to exit event loop */
- private static byte[] bytes_ = new byte[1];
- private long bytesRead_ = 0L;
- private FileChannel fc_;
- private StreamContextManager.StreamContext streamContext_;
- private StreamContextManager.StreamStatus streamStatus_;
-
- ContentStreamState(TcpReader stream)
- {
- super(stream);
- SocketChannel socketChannel = stream.getStream();
- InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- String remoteHost = remoteAddress.getHostName();
- streamContext_ = StreamContextManager.getStreamContext(remoteHost);
- streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
- }
-
- private void createFileChannel() throws IOException
- {
- if ( fc_ == null )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Creating file for " + streamContext_.getTargetFile());
- FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
- fc_ = fos.getChannel();
- }
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- SocketChannel socketChannel = stream_.getStream();
- InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
- String remoteHost = remoteAddress.getHostName();
- createFileChannel();
- if ( streamContext_ != null )
- {
- try
- {
- bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
- if ( bytesRead_ != streamContext_.getExpectedBytes() )
- throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
- }
- catch ( IOException ex )
- {
- /* Ask the source node to re-stream this file. */
- streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);
- handleStreamCompletion(remoteHost);
- /* Delete the orphaned file. */
- File file = new File(streamContext_.getTargetFile());
- file.delete();
- throw ex;
- }
- if ( bytesRead_ == streamContext_.getExpectedBytes() )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Removing stream context " + streamContext_);
- handleStreamCompletion(remoteHost);
- bytesRead_ = 0L;
- fc_.close();
- morphState();
- }
- }
-
- return new byte[0];
- }
-
- private void handleStreamCompletion(String remoteHost) throws IOException
- {
- /*
- * Streaming is complete. If all the data that has to be received inform the sender via
- * the stream completion callback so that the source may perform the requisite cleanup.
- */
- IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
- if ( streamComplete != null )
- {
- streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);
- }
- }
-
- public void morphState() throws IOException
- {
- /* We instantiate an array of size 1 so that we can exit the event loop of the read. */
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
- if ( nextState == null )
- {
- nextState = new DoneState(stream_, ContentStreamState.bytes_);
- stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
- }
- else
- {
- nextState.setContextData(ContentStreamState.bytes_);
- }
- stream_.morphState( nextState );
- }
-
- public void setContextData(Object data)
- {
- throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{
+ private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+ private static long count_ = 64*1024*1024;
+ /* Return this byte array to exit event loop */
+ private static byte[] bytes_ = new byte[1];
+ private long bytesRead_ = 0L;
+ private FileChannel fc_;
+ private StreamContextManager.StreamContext streamContext_;
+ private StreamContextManager.StreamStatus streamStatus_;
+
+ ContentStreamState(TcpReader stream)
+ {
+ super(stream);
+ SocketChannel socketChannel = stream.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ streamContext_ = StreamContextManager.getStreamContext(remoteHost);
+ streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+ }
+
+ private void createFileChannel() throws IOException
+ {
+ if ( fc_ == null )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Creating file for " + streamContext_.getTargetFile());
+ FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+ fc_ = fos.getChannel();
+ }
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ createFileChannel();
+ if ( streamContext_ != null )
+ {
+ try
+ {
+ bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+ if ( bytesRead_ != streamContext_.getExpectedBytes() )
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ catch ( IOException ex )
+ {
+ /* Ask the source node to re-stream this file. */
+ streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+ handleStreamCompletion(remoteHost);
+ /* Delete the orphaned file. */
+ File file = new File(streamContext_.getTargetFile());
+ file.delete();
+ throw ex;
+ }
+ if ( bytesRead_ == streamContext_.getExpectedBytes() )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Removing stream context " + streamContext_);
+ handleStreamCompletion(remoteHost);
+ bytesRead_ = 0L;
+ fc_.close();
+ morphState();
+ }
+ }
+
+ return new byte[0];
+ }
+
+ private void handleStreamCompletion(String remoteHost) throws IOException
+ {
+ /*
+ * Streaming is complete. If all the data that has to be received inform the sender via
+ * the stream completion callback so that the source may perform the requisite cleanup.
+ */
+ IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+ if ( streamComplete != null )
+ {
+ streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);
+ }
+ }
+
+ public void morphState() throws IOException
+ {
+ /* We instantiate an array of size 1 so that we can exit the event loop of the read. */
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, ContentStreamState.bytes_);
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(ContentStreamState.bytes_);
+ }
+ stream_.morphState( nextState );
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java Thu Jul 30 15:30:21 2009
@@ -1,52 +1,52 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class DoneState extends StartState
-{
- private byte[] bytes_ = new byte[0];
-
- DoneState(TcpReader stream, byte[] bytes)
- {
- super(stream);
- bytes_ = bytes;
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- morphState();
- return bytes_;
- }
-
- public void morphState() throws IOException
- {
- stream_.morphState(null);
- }
-
- public void setContextData(Object data)
- {
- bytes_ = (byte[])data;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+ private byte[] bytes_ = new byte[0];
+
+ DoneState(TcpReader stream, byte[] bytes)
+ {
+ super(stream);
+ bytes_ = bytes;
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ morphState();
+ return bytes_;
+ }
+
+ public void morphState() throws IOException
+ {
+ stream_.morphState(null);
+ }
+
+ public void setContextData(Object data)
+ {
+ bytes_ = (byte[])data;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java Thu Jul 30 15:30:21 2009
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.Message;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class FastSerializer implements ISerializer
-{
- public byte[] serialize(Message message) throws IOException
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Message.serializer().serialize(message, buffer);
- return buffer.getData();
- }
-
- public Message deserialize(byte[] bytes) throws IOException
- {
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, bytes.length);
- return Message.serializer().deserialize(bufIn);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{
+ public byte[] serialize(Message message) throws IOException
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ Message.serializer().serialize(message, buffer);
+ return buffer.getData();
+ }
+
+ public Message deserialize(byte[] bytes) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ return Message.serializer().deserialize(bufIn);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java Thu Jul 30 15:30:21 2009
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.net.Message;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface ISerializer
-{
- public byte[] serialize(Message message) throws IOException;
- public Message deserialize(byte[] bytes) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+ public byte[] serialize(Message message) throws IOException;
+ public Message deserialize(byte[] bytes) throws IOException;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java Thu Jul 30 15:30:21 2009
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.net.EndPoint;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IStreamComplete
-{
- /*
- * This callback if registered with the StreamContextManager is
- * called when the stream from a host is completely handled.
- */
- public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+ /*
+ * This callback if registered with the StreamContextManager is
+ * called when the stream from a host is completely handled.
+ */
+ public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java Thu Jul 30 15:30:21 2009
@@ -1,103 +1,103 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import org.apache.cassandra.utils.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-import org.apache.cassandra.net.MessagingService;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ProtocolHeaderState extends StartState
-{
- private ByteBuffer buffer_;
-
- public ProtocolHeaderState(TcpReader stream)
- {
- super(stream);
- buffer_ = ByteBuffer.allocate(4);
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- return doRead(buffer_);
- }
-
- public void morphState() throws IOException
- {
- byte[] protocolHeader = buffer_.array();
- int pH = MessagingService.byteArrayToInt(protocolHeader);
-
- int type = MessagingService.getBits(pH, 1, 2);
- stream_.getProtocolHeader().serializerType_ = type;
-
- int stream = MessagingService.getBits(pH, 3, 1);
- stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
-
- if ( stream_.getProtocolHeader().isStreamingMode_ )
- MessagingService.setStreamingMode(true);
-
- int listening = MessagingService.getBits(pH, 4, 1);
- stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
-
- int version = MessagingService.getBits(pH, 15, 8);
- stream_.getProtocolHeader().version_ = version;
-
- if ( version <= MessagingService.getVersion() )
- {
- if ( stream_.getProtocolHeader().isStreamingMode_ )
- {
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
- if ( nextState == null )
- {
- nextState = new ContentStreamState(stream_);
- stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
- }
- stream_.morphState( nextState );
- buffer_.clear();
- }
- else
- {
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
- if ( nextState == null )
- {
- nextState = new ContentLengthState(stream_);
- stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
- }
- stream_.morphState( nextState );
- buffer_.clear();
- }
- }
- else
- {
- throw new IOException("Invalid version in message. Scram.");
- }
- }
-
- public void setContextData(Object data)
- {
- throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
- }
-}
-
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolHeaderState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocolHeader = buffer_.array();
+ int pH = MessagingService.byteArrayToInt(protocolHeader);
+
+ int type = MessagingService.getBits(pH, 1, 2);
+ stream_.getProtocolHeader().serializerType_ = type;
+
+ int stream = MessagingService.getBits(pH, 3, 1);
+ stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ MessagingService.setStreamingMode(true);
+
+ int listening = MessagingService.getBits(pH, 4, 1);
+ stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+
+ int version = MessagingService.getBits(pH, 15, 8);
+ stream_.getProtocolHeader().version_ = version;
+
+ if ( version <= MessagingService.getVersion() )
+ {
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+ if ( nextState == null )
+ {
+ nextState = new ContentStreamState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+ if ( nextState == null )
+ {
+ nextState = new ContentLengthState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ }
+ else
+ {
+ throw new IOException("Invalid version in message. Scram.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+ }
+}
+
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java Thu Jul 30 15:30:21 2009
@@ -1,71 +1,71 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import org.apache.cassandra.utils.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-import org.apache.cassandra.net.MessagingService;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ProtocolState extends StartState
-{
- private ByteBuffer buffer_;
-
- public ProtocolState(TcpReader stream)
- {
- super(stream);
- buffer_ = ByteBuffer.allocate(16);
- }
-
- public byte[] read() throws IOException, ReadNotCompleteException
- {
- return doRead(buffer_);
- }
-
- public void morphState() throws IOException
- {
- byte[] protocol = buffer_.array();
- if ( MessagingService.isProtocolValid(protocol) )
- {
- StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
- if ( nextState == null )
- {
- nextState = new ProtocolHeaderState(stream_);
- stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
- }
- stream_.morphState( nextState );
- buffer_.clear();
- }
- else
- {
- throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
- }
- }
-
- public void setContextData(Object data)
- {
- throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(16);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocol = buffer_.array();
+ if ( MessagingService.isProtocolValid(protocol) )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolHeaderState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java Thu Jul 30 15:30:21 2009
@@ -1,34 +1,34 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-/**
- * Created by IntelliJ IDEA.
- * User: lakshman
- * Date: Aug 22, 2005
- * Time: 11:37:31 AM
- * To change this template use File | Settings | File Templates.
- */
-public class ReadNotCompleteException extends Exception
-{
- ReadNotCompleteException(String message)
- {
- super(message);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+ ReadNotCompleteException(String message)
+ {
+ super(message);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java Thu Jul 30 15:30:21 2009
@@ -1,27 +1,27 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.lang.annotation.*;
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface SerializerAttribute
-{
- SerializerType value();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+ SerializerType value();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java Thu Jul 30 15:30:21 2009
@@ -1,27 +1,27 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-public enum SerializerType
-{
- BINARY,
- JAVA,
- XML,
- JSON
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+ BINARY,
+ JAVA,
+ XML,
+ JSON
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java Thu Jul 30 15:30:21 2009
@@ -1,59 +1,59 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.channels.SocketChannel;
-import java.nio.ByteBuffer;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public abstract class StartState
-{
- protected TcpReader stream_;
-
- public StartState(TcpReader stream)
- {
- stream_ = stream;
- }
-
- public abstract byte[] read() throws IOException, ReadNotCompleteException;
- public abstract void morphState() throws IOException;
- public abstract void setContextData(Object data);
-
- protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
- {
- SocketChannel socketChannel = stream_.getStream();
- int bytesRead = socketChannel.read(buffer);
- if ( bytesRead == -1 && buffer.remaining() > 0 )
- {
- throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
- }
- if ( buffer.remaining() == 0 )
- {
- morphState();
- }
- else
- {
- throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
- }
- return new byte[0];
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+ protected TcpReader stream_;
+
+ public StartState(TcpReader stream)
+ {
+ stream_ = stream;
+ }
+
+ public abstract byte[] read() throws IOException, ReadNotCompleteException;
+ public abstract void morphState() throws IOException;
+ public abstract void setContextData(Object data);
+
+ protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ int bytesRead = socketChannel.read(buffer);
+ if ( bytesRead == -1 && buffer.remaining() > 0 )
+ {
+ throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+ }
+ if ( buffer.remaining() == 0 )
+ {
+ morphState();
+ }
+ else
+ {
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ return new byte[0];
+ }
+}