You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2011/09/30 19:18:51 UTC
svn commit: r1177737 - in /cassandra/branches/cassandra-1.0: CHANGES.txt
src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
src/java/org/apache/cassandra/thrift/TCustomSocket.java
Author: jake
Date: Fri Sep 30 17:18:50 2011
New Revision: 1177737
URL: http://svn.apache.org/viewvc?rev=1177737&view=rev
Log:
Thrift sockets are not properly buffered
Patch my tjake; reviewed by jfarrell for CASSANDRA-3261
Added:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1177737&r1=1177736&r2=1177737&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Sep 30 17:18:50 2011
@@ -1,5 +1,6 @@
1.0.1
* describe_ring should include datacenter/topology information (CASSANDRA-2882)
+ * Thrift sockets are not properly buffered (CASSANDRA-3261)
1.0.0-final
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=1177737&r1=1177736&r2=1177737&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java Fri Sep 30 17:18:50 2011
@@ -1,4 +1,5 @@
package org.apache.cassandra.thrift;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,8 +21,9 @@ package org.apache.cassandra.thrift;
*
*/
-
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
@@ -29,44 +31,79 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
/**
- * Extends Thrift's TServerSocket to allow customization of various desirable
- * TCP properties.
+ * Extends Thrift's TServerSocket to allow customization of various desirable TCP properties.
*/
-public class TCustomServerSocket extends TServerSocket
+public class TCustomServerSocket extends TServerTransport
{
private static final Logger logger = LoggerFactory.getLogger(TCustomServerSocket.class);
+ /**
+ * Underlying serversocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
private final boolean keepAlive;
private final Integer sendBufferSize;
private final Integer recvBufferSize;
/**
* Allows fine-tuning of the server socket including keep-alive, reuse of addresses, send and receive buffer sizes.
+ *
* @param bindAddr
* @param keepAlive
* @param sendBufferSize
* @param recvBufferSize
* @throws TTransportException
*/
- public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize)
- throws TTransportException
+ public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize,
+ Integer recvBufferSize)
+ throws TTransportException
{
- super(bindAddr);
+ try
+ {
+ // Make server socket
+ serverSocket_ = new ServerSocket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(bindAddr);
+ }
+ catch (IOException ioe)
+ {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ }
+
this.keepAlive = keepAlive;
this.sendBufferSize = sendBufferSize;
this.recvBufferSize = recvBufferSize;
}
@Override
- protected TSocket acceptImpl() throws TTransportException
+ protected TCustomSocket acceptImpl() throws TTransportException
{
- TSocket tsocket = super.acceptImpl();
- Socket socket = tsocket.getSocket();
+
+ if (serverSocket_ == null)
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+
+ TCustomSocket tsocket = null;
+ Socket socket = null;
+ try
+ {
+ socket = serverSocket_.accept();
+ tsocket = new TCustomSocket(socket);
+ tsocket.setTimeout(0);
+ }
+ catch (IOException iox)
+ {
+ throw new TTransportException(iox);
+ }
try
{
@@ -103,4 +140,38 @@ public class TCustomServerSocket extends
return tsocket;
}
+
+ @Override
+ public void listen() throws TTransportException
+ {
+ // Make sure not to block on accept
+ if (serverSocket_ != null)
+ {
+ try
+ {
+ serverSocket_.setSoTimeout(0);
+ }
+ catch (SocketException sx)
+ {
+ logger.error("Could not set socket timeout.", sx);
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (serverSocket_ != null)
+ {
+ try
+ {
+ serverSocket_.close();
+ }
+ catch (IOException iox)
+ {
+ logger.warn("Could not close server socket.", iox);
+ }
+ serverSocket_ = null;
+ }
+ }
}
Added: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java?rev=1177737&view=auto
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java (added)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java Fri Sep 30 17:18:50 2011
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket implementation of the TTransport interface.
+ *
+ * Adds socket buffering
+ *
+ */
+public class TCustomSocket extends TIOStreamTransport {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TCustomSocket.class.getName());
+
+ /**
+ * Wrapped Socket object
+ */
+ private Socket socket_ = null;
+
+ /**
+ * Remote host
+ */
+ private String host_ = null;
+
+ /**
+ * Remote port
+ */
+ private int port_ = 0;
+
+ /**
+ * Socket timeout
+ */
+ private int timeout_ = 0;
+
+ /**
+ * Constructor that takes an already created socket.
+ *
+ * @param socket Already created socket object
+ * @throws TTransportException if there is an error setting up the streams
+ */
+ public TCustomSocket(Socket socket) throws TTransportException {
+ socket_ = socket;
+ try {
+ socket_.setSoLinger(false, 0);
+ socket_.setTcpNoDelay(true);
+ } catch (SocketException sx) {
+ LOGGER.warn("Could not configure socket.", sx);
+ }
+
+ if (isOpen()) {
+ try {
+ inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+ outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+ } catch (IOException iox) {
+ close();
+ throw new TTransportException(TTransportException.NOT_OPEN, iox);
+ }
+ }
+ }
+
+ /**
+ * Creates a new unconnected socket that will connect to the given host
+ * on the given port.
+ *
+ * @param host Remote host
+ * @param port Remote port
+ */
+ public TCustomSocket(String host, int port) {
+ this(host, port, 0);
+ }
+
+ /**
+ * Creates a new unconnected socket that will connect to the given host
+ * on the given port.
+ *
+ * @param host Remote host
+ * @param port Remote port
+ * @param timeout Socket timeout
+ */
+ public TCustomSocket(String host, int port, int timeout) {
+ host_ = host;
+ port_ = port;
+ timeout_ = timeout;
+ initSocket();
+ }
+
+ /**
+ * Initializes the socket object
+ */
+ private void initSocket() {
+ socket_ = new Socket();
+ try {
+ socket_.setSoLinger(false, 0);
+ socket_.setTcpNoDelay(true);
+ socket_.setSoTimeout(timeout_);
+ } catch (SocketException sx) {
+ LOGGER.error("Could not configure socket.", sx);
+ }
+ }
+
+ /**
+ * Sets the socket timeout
+ *
+ * @param timeout Milliseconds timeout
+ */
+ public void setTimeout(int timeout) {
+ timeout_ = timeout;
+ try {
+ socket_.setSoTimeout(timeout);
+ } catch (SocketException sx) {
+ LOGGER.warn("Could not set socket timeout.", sx);
+ }
+ }
+
+ /**
+ * Returns a reference to the underlying socket.
+ */
+ public Socket getSocket() {
+ if (socket_ == null) {
+ initSocket();
+ }
+ return socket_;
+ }
+
+ /**
+ * Checks whether the socket is connected.
+ */
+ public boolean isOpen() {
+ if (socket_ == null) {
+ return false;
+ }
+ return socket_.isConnected();
+ }
+
+ /**
+ * Connects the socket, creating a new socket object if necessary.
+ */
+ public void open() throws TTransportException {
+ if (isOpen()) {
+ throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
+ }
+
+ if (host_.length() == 0) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
+ }
+ if (port_ <= 0) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
+ }
+
+ if (socket_ == null) {
+ initSocket();
+ }
+
+ try {
+ socket_.connect(new InetSocketAddress(host_, port_), timeout_);
+ inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+ outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+ } catch (IOException iox) {
+ close();
+ throw new TTransportException(TTransportException.NOT_OPEN, iox);
+ }
+ }
+
+ /**
+ * Closes the socket.
+ */
+ public void close() {
+ // Close the underlying streams
+ super.close();
+
+ // Close the socket
+ if (socket_ != null) {
+ try {
+ socket_.close();
+ } catch (IOException iox) {
+ LOGGER.warn("Could not close socket.", iox);
+ }
+ socket_ = null;
+ }
+ }
+
+}
\ No newline at end of file