You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/01/05 18:24:07 UTC

svn commit: r1055538 - in /cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver: Connection.java ConnectionPool.java IConnectionPool.java Utils.java

Author: eevans
Date: Wed Jan  5 17:24:07 2011
New Revision: 1055538

URL: http://svn.apache.org/viewvc?rev=1055538&view=rev
Log:
CASSANDRA-1710 basic connection pooling for java driver

Patch by eevans for CASSANDRA-1710

Added:
    cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java
    cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java
    cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java
Modified:
    cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java

Modified: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java?rev=1055538&r1=1055537&r2=1055538&view=diff
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java (original)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java Wed Jan  5 17:24:07 2011
@@ -1,33 +1,8 @@
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
 package org.apache.cassandra.cql.driver;
 
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.zip.Deflater;
-
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.TimedOutException;
 import org.apache.cassandra.thrift.UnavailableException;
@@ -37,105 +12,95 @@ import org.apache.thrift.protocol.TProto
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/** CQL connection object. */
 public class Connection
 {
-    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
+    public static Compression defaultCompression = Compression.GZIP;
+    public final String hostName;
+    public final int portNo;
     
-    public String hostName;
-    public int port;
+    private static final Logger logger = LoggerFactory.getLogger(Connection.class);
+    protected long timeOfLastFailure = 0;
+    protected int numFailures = 0;
     private Cassandra.Client client;
     private TTransport transport;
-    private Compression defaultCompression = Compression.GZIP;
     
-    public Connection(String keyspaceName, String...hosts) throws InvalidRequestException, TException
+    /**
+     * Create a new <code>Connection</code> instance.
+     * 
+     * @param hostName hostname or IP address of the remote host
+     * @param portNo TCP port number
+     * @throws TTransportException if unable to connect
+     */
+    public Connection(String hostName, int portNo) throws TTransportException
     {
-        assert hosts.length > 0;
+        this.hostName = hostName;
+        this.portNo = portNo;
         
-        for (String hostSpec : hosts)
-        {
-            String[] parts = hostSpec.split(":", 2);
-            this.hostName = parts[0];
-            this.port = Integer.parseInt(parts[1]);
-            
-            // TODO: This will need to do connection pooling.
-            break;
-        }
-        
-        TSocket socket = new TSocket(hostName, port);
+        TSocket socket = new TSocket(hostName, portNo);
         transport = new TFramedTransport(socket);
         TProtocol protocol = new TBinaryProtocol(transport);
         client = new Cassandra.Client(protocol);
         socket.open();
         
-        client.set_keyspace(keyspaceName);
-    }
-    
-    private ByteBuffer compressQuery(String queryStr, Compression compression)
-    {
-        byte[] data = queryStr.getBytes();
-        Deflater compressor = new Deflater();
-        compressor.setInput(data);
-        compressor.finish();
-        
-        ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
-        byte[] buffer = new byte[1024];
-        
-        while (!compressor.finished())
-        {
-            int size = compressor.deflate(buffer);
-            byteArray.write(buffer, 0, size);
-        }
-        
-        logger.trace("Compressed query statement {} bytes in length to {} bytes",
-                     data.length,
-                     byteArray.size());
-        
-        return ByteBuffer.wrap(byteArray.toByteArray());
+        logger.info("Connected to {}:{}", hostName, portNo);
     }
     
+    /**
+     * Execute a CQL query.
+     * 
+     * @param queryStr a CQL query string
+     * @return the query results encoded as a CqlResult struct
+     * @throws InvalidRequestException on poorly constructed or illegal requests
+     * @throws UnavailableException when not all required replicas could be created/read
+     * @throws TimedOutException when a cluster operation timed out
+     * @throws TException
+     */
     public CqlResult execute(String queryStr)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        return execute(queryStr, getDefaultCompression());
+        return execute(queryStr, defaultCompression);
     }
     
-    public CqlResult execute(String queryStr, Compression compression)
+    /**
+     * Execute a CQL query.
+     * 
+     * @param queryStr a CQL query string
+     * @param compress query compression to use
+     * @return the query results encoded as a CqlResult struct
+     * @throws InvalidRequestException on poorly constructed or illegal requests
+     * @throws UnavailableException when not all required replicas could be created/read
+     * @throws TimedOutException when a cluster operation timed out
+     * @throws TException
+     */
+    public CqlResult execute(String queryStr, Compression compress)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
-        logger.trace("Executing CQL Query: {}", queryStr);
-        return client.execute_cql_query(compressQuery(queryStr, compression), compression);
+        try
+        {
+            return client.execute_cql_query(Utils.compressQuery(queryStr, compress), compress);
+        }
+        catch (TException error)
+        {
+            numFailures++;
+            timeOfLastFailure = System.currentTimeMillis();
+            throw error;
+        }
     }
     
-    public Compression getDefaultCompression()
+    /** Shutdown the remote connection */
+    public void close()
     {
-        return defaultCompression;
+        transport.close();
     }
-
-    public void setDefaultCompression(Compression defaultCompression)
-    {
-        this.defaultCompression = defaultCompression;
-    }
-
-    public static void main(String[] args) throws Exception
+    
+    /** Connection state. */
+    public boolean isOpen()
     {
-        Connection conn = new Connection("Keyspace1", "localhost:9160");
-        CqlResult result = conn.execute("UPDATE Standard2 USING CONSISTENCY.ONE WITH ROW(\"mango\", COL(\"disposition\", \"fruit\"));");
-        String selectQ = "SELECT FROM Standard2 WHERE KEY > \"apple\" AND KEY < \"carrot\" ROWLIMIT 5 DESC;";
-        result = conn.execute(selectQ);
-        switch (result.type)
-        {
-            case ROWS:
-                for (CqlRow row : result.rows)
-                {
-                    System.out.println("KEY: " + new String(row.key.array()));
-                    for (org.apache.cassandra.thrift.Column col : row.columns)
-                    {
-                        System.out.println("  COL: " + new String(col.name.array()) + ":" + new String(col.value.array()));
-                    }
-                }
-        }
+        return transport.isOpen();
     }
 }

Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java Wed Jan  5 17:24:07 2011
@@ -0,0 +1,156 @@
+
+package org.apache.cassandra.cql.driver;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple connection-caching pool implementation.
+ * 
+ * <p>A <code>ConnectionPool</code> provides the simplest possible connection
+ * pooling, lazily creating new connections if needed as
+ * <code>borrowClient</code> is called.  Connections are re-added to the pool
+ * by <code>returnConnection</code>, unless doing so would exceed the maximum
+ * pool size.</p>
+ * 
+ * <p>Example usage:</p>
+ * 
+ * <code>
+ * IConnectionPool pool = new ConnectionPool("localhost", 9160);<br />
+ * Connection conn = pool.borrowConnection();<br />
+ * conn.execute(...);<br />
+ * pool.returnConnection(pool);<br />
+ * </code>
+ */
+public class ConnectionPool implements IConnectionPool
+{
+    public static final int DEFAULT_MAX_CONNECTIONS = 25;
+    public static final int DEFAULT_PORT = 9160;
+    public static final int DEFAULT_MAX_IDLE = 5;
+    public static final long DEFAULT_EVICTION_DELAY_MILLIS = 10000; // 10 seconds
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
+    
+    private ConcurrentLinkedQueue<Connection> connections = new ConcurrentLinkedQueue<Connection>();
+    private Timer eviction;
+    private String hostName;
+    private int portNo;
+    private int maxConns, maxIdle;
+    
+    /**
+     * Create a new <code>ConnectionPool</code> for a given hostname.
+     * 
+     * @param hostName hostname or IP address to open connections to
+     * @throws TTransportException if unable to connect
+     */
+    public ConnectionPool(String hostName) throws TTransportException
+    {
+        this(hostName, DEFAULT_PORT, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_IDLE, DEFAULT_EVICTION_DELAY_MILLIS);
+    }
+    
+    /**
+     * Create a new <code>ConnectionPool</code> for the given hostname and
+     * port number.
+     * 
+     * @param hostName hostname or IP address to open connections to
+     * @param portNo port number to connect to
+     * @throws TTransportException if unable to connect
+     */
+    public ConnectionPool(String hostName, int portNo) throws TTransportException
+    {
+        this(hostName, portNo, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_IDLE, DEFAULT_EVICTION_DELAY_MILLIS);
+    }
+    
+    /**
+     * Create a new <code>ConnectionPool</code>.
+     * 
+     * @param hostName hostname or IP address to open connections to
+     * @param portNo portNo port number to connect to
+     * @param maxConns the maximum number of connections to save in the pool
+     * @param maxIdle the max number of connections allowed in the pool after an eviction
+     * @param evictionDelay the number milliseconds between eviction runs
+     * @throws TTransportException if unable to connect
+     */
+    public ConnectionPool(String hostName, int portNo, int maxConns, int maxIdle, long evictionDelay)
+    throws TTransportException
+    {
+        this.hostName = hostName;
+        this.portNo = portNo;
+        this.maxConns = maxConns;
+        this.maxIdle = maxIdle;
+        
+        eviction = new Timer("EVICTION-THREAD", true);
+        eviction.schedule(new EvictionTask(), new Date(), evictionDelay);
+        
+        connections.add(new Connection(hostName, portNo));
+    }
+    
+    /**
+     * Check a <code>Connection</code> instance out from the pool, creating a
+     * new one if the pool is exhausted.
+     */
+    public Connection borrowConnection()
+    {
+        Connection conn = null;
+        
+        if ((conn = connections.poll()) == null)
+        {
+            try
+            {
+                conn = new Connection(hostName, portNo);
+            }
+            catch (TTransportException error)
+            {
+                logger.error(String.format("Error connecting to %s:%s", hostName, portNo), error);
+            }
+        }
+        
+        return conn;
+    }
+    
+    /**
+     * Returns an <code>Connection</code> instance to the pool.  If the pool
+     * already contains the maximum number of allowed connections, then the
+     * instance's <code>close</code> method is called and it is discarded.
+     */
+    public void returnConnection(Connection connection)
+    {
+        if (connections.size() >= maxConns)
+        {
+            if (connection.isOpen()) connection.close();
+            logger.warn("Max pool size reached; Connection discarded.");
+            return;
+        }
+        
+        if (!connection.isOpen())
+        {
+            logger.warn("Stubbornly refusing to return a closed connection to the pool (discarded instead).");
+            return;
+        }
+        
+        connections.add(connection);
+    }
+    
+    private class EvictionTask extends TimerTask
+    {
+        public void run()
+        {
+            int count = 0;
+            
+            while (connections.size() > maxIdle)
+            {
+                Connection conn = connections.poll();
+                if (conn.isOpen()) conn.close();
+                count++;
+            }
+            
+            if (count > 0)
+                logger.debug("Eviction run complete: {} connections evicted.", count);
+        }
+    }
+}

Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java Wed Jan  5 17:24:07 2011
@@ -0,0 +1,17 @@
+package org.apache.cassandra.cql.driver;
+
+public interface IConnectionPool
+{
+    /**
+     * Check a <code>Connection</code> instance out from the pool, creating a
+     * new one if the pool is exhausted.
+     */
+    public Connection borrowConnection();
+    
+    /**
+     * Returns an <code>Connection</code> instance to the pool.  If the pool
+     * already contains the maximum number of allowed connections, then the
+     * instance's <code>close</code> method is called and it is discarded.
+     */
+    public void returnConnection(Connection connection);
+}

Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java Wed Jan  5 17:24:07 2011
@@ -0,0 +1,37 @@
+package org.apache.cassandra.cql.driver;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+
+import org.apache.cassandra.thrift.Compression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils
+{
+    private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+    
+    public static ByteBuffer compressQuery(String queryStr, Compression compression)
+    {
+        byte[] data = queryStr.getBytes();
+        Deflater compressor = new Deflater();
+        compressor.setInput(data);
+        compressor.finish();
+        
+        ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        
+        while (!compressor.finished())
+        {
+            int size = compressor.deflate(buffer);
+            byteArray.write(buffer, 0, size);
+        }
+        
+        logger.trace("Compressed query statement {} bytes in length to {} bytes",
+                     data.length,
+                     byteArray.size());
+        
+        return ByteBuffer.wrap(byteArray.toByteArray());
+    }
+}