You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2011/09/30 19:18:51 UTC

svn commit: r1177737 - in /cassandra/branches/cassandra-1.0: CHANGES.txt src/java/org/apache/cassandra/thrift/TCustomServerSocket.java src/java/org/apache/cassandra/thrift/TCustomSocket.java

Author: jake
Date: Fri Sep 30 17:18:50 2011
New Revision: 1177737

URL: http://svn.apache.org/viewvc?rev=1177737&view=rev
Log:
Thrift sockets are not properly buffered
Patch my tjake; reviewed by jfarrell for CASSANDRA-3261

Added:
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java
Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1177737&r1=1177736&r2=1177737&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Sep 30 17:18:50 2011
@@ -1,5 +1,6 @@
 1.0.1
  * describe_ring should include datacenter/topology information (CASSANDRA-2882)
+ * Thrift sockets are not properly buffered (CASSANDRA-3261)
 
 
 1.0.0-final

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=1177737&r1=1177736&r2=1177737&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java Fri Sep 30 17:18:50 2011
@@ -1,4 +1,5 @@
 package org.apache.cassandra.thrift;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,8 +21,9 @@ package org.apache.cassandra.thrift;
  * 
  */
 
-
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 
@@ -29,44 +31,79 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 /**
- * Extends Thrift's TServerSocket to allow customization of various desirable
- * TCP properties.
+ * Extends Thrift's TServerSocket to allow customization of various desirable TCP properties.
  */
-public class TCustomServerSocket extends TServerSocket
+public class TCustomServerSocket extends TServerTransport
 {
 
     private static final Logger logger = LoggerFactory.getLogger(TCustomServerSocket.class);
 
+    /**
+     * Underlying serversocket object
+     */
+    private ServerSocket serverSocket_ = null;
+
     private final boolean keepAlive;
     private final Integer sendBufferSize;
     private final Integer recvBufferSize;
 
     /**
      * Allows fine-tuning of the server socket including keep-alive, reuse of addresses, send and receive buffer sizes.
+     * 
      * @param bindAddr
      * @param keepAlive
      * @param sendBufferSize
      * @param recvBufferSize
      * @throws TTransportException
      */
-    public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize)
-    throws TTransportException
+    public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize,
+            Integer recvBufferSize)
+            throws TTransportException
     {
-        super(bindAddr);
+        try
+        {
+            // Make server socket
+            serverSocket_ = new ServerSocket();
+            // Prevent 2MSL delay problem on server restarts
+            serverSocket_.setReuseAddress(true);
+            // Bind to listening port
+            serverSocket_.bind(bindAddr);
+        }
+        catch (IOException ioe)
+        {
+            serverSocket_ = null;
+            throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+        }
+
         this.keepAlive = keepAlive;
         this.sendBufferSize = sendBufferSize;
         this.recvBufferSize = recvBufferSize;
     }
 
     @Override
-    protected TSocket acceptImpl() throws TTransportException
+    protected TCustomSocket acceptImpl() throws TTransportException
     {
-        TSocket tsocket = super.acceptImpl();
-        Socket socket = tsocket.getSocket();
+
+        if (serverSocket_ == null)
+            throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+
+        TCustomSocket tsocket = null;
+        Socket socket = null;
+        try
+        {
+            socket = serverSocket_.accept();
+            tsocket = new TCustomSocket(socket);
+            tsocket.setTimeout(0);
+        }
+        catch (IOException iox)
+        {
+            throw new TTransportException(iox);
+        }
 
         try
         {
@@ -103,4 +140,38 @@ public class TCustomServerSocket extends
 
         return tsocket;
     }
+
+    @Override
+    public void listen() throws TTransportException
+    {
+        // Make sure not to block on accept
+        if (serverSocket_ != null)
+        {
+            try
+            {
+                serverSocket_.setSoTimeout(0);
+            }
+            catch (SocketException sx)
+            {
+                logger.error("Could not set socket timeout.", sx);
+            }
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        if (serverSocket_ != null)
+        {
+            try
+            {
+                serverSocket_.close();
+            }
+            catch (IOException iox)
+            {
+                logger.warn("Could not close server socket.", iox);
+            }
+            serverSocket_ = null;
+        }
+    }
 }

Added: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java?rev=1177737&view=auto
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java (added)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomSocket.java Fri Sep 30 17:18:50 2011
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+ 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ 
+/**
+ * Socket implementation of the TTransport interface. 
+ * 
+ * Adds socket buffering
+ *
+ */
+public class TCustomSocket extends TIOStreamTransport {
+ 
+  private static final Logger LOGGER = LoggerFactory.getLogger(TCustomSocket.class.getName());
+ 
+  /**
+   * Wrapped Socket object
+   */
+  private Socket socket_ = null;
+ 
+  /**
+   * Remote host
+   */
+  private String host_  = null;
+ 
+  /**
+   * Remote port
+   */
+  private int port_ = 0;
+ 
+  /**
+   * Socket timeout
+   */
+  private int timeout_ = 0;
+ 
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socket Already created socket object
+   * @throws TTransportException if there is an error setting up the streams
+   */
+  public TCustomSocket(Socket socket) throws TTransportException {
+    socket_ = socket;
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not configure socket.", sx);
+    }
+ 
+    if (isOpen()) {
+      try {
+        inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+        outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+      } catch (IOException iox) {
+        close();
+        throw new TTransportException(TTransportException.NOT_OPEN, iox);
+      }
+    }
+  }
+ 
+  /**
+   * Creates a new unconnected socket that will connect to the given host
+   * on the given port.
+   *
+   * @param host Remote host
+   * @param port Remote port
+   */
+  public TCustomSocket(String host, int port) {
+    this(host, port, 0);
+  }
+ 
+  /**
+   * Creates a new unconnected socket that will connect to the given host
+   * on the given port.
+   *
+   * @param host    Remote host
+   * @param port    Remote port
+   * @param timeout Socket timeout
+   */
+  public TCustomSocket(String host, int port, int timeout) {
+    host_ = host;
+    port_ = port;
+    timeout_ = timeout;
+    initSocket();
+  }
+ 
+  /**
+   * Initializes the socket object
+   */
+  private void initSocket() {
+    socket_ = new Socket();
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+      socket_.setSoTimeout(timeout_);
+    } catch (SocketException sx) {
+      LOGGER.error("Could not configure socket.", sx);
+    }
+  }
+ 
+  /**
+   * Sets the socket timeout
+   *
+   * @param timeout Milliseconds timeout
+   */
+  public void setTimeout(int timeout) {
+    timeout_ = timeout;
+    try {
+      socket_.setSoTimeout(timeout);
+    } catch (SocketException sx) {
+      LOGGER.warn("Could not set socket timeout.", sx);
+    }
+  }
+ 
+  /**
+   * Returns a reference to the underlying socket.
+   */
+  public Socket getSocket() {
+    if (socket_ == null) {
+      initSocket();
+    }
+    return socket_;
+  }
+ 
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    if (socket_ == null) {
+      return false;
+    }
+    return socket_.isConnected();
+  }
+ 
+  /**
+   * Connects the socket, creating a new socket object if necessary.
+   */
+  public void open() throws TTransportException {
+    if (isOpen()) {
+      throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
+    }
+ 
+    if (host_.length() == 0) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
+    }
+    if (port_ <= 0) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
+    }
+ 
+    if (socket_ == null) {
+      initSocket();
+    }
+ 
+    try {
+      socket_.connect(new InetSocketAddress(host_, port_), timeout_);
+      inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+      outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+    } catch (IOException iox) {
+      close();
+      throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    }
+  }
+ 
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    // Close the underlying streams
+    super.close();
+ 
+    // Close the socket
+    if (socket_ != null) {
+      try {
+        socket_.close();
+      } catch (IOException iox) {
+        LOGGER.warn("Could not close socket.", iox);
+      }
+      socket_ = null;
+    }
+  }
+ 
+}
\ No newline at end of file