You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2011/01/05 18:24:07 UTC
svn commit: r1055538 - in
/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver:
Connection.java ConnectionPool.java IConnectionPool.java Utils.java
Author: eevans
Date: Wed Jan 5 17:24:07 2011
New Revision: 1055538
URL: http://svn.apache.org/viewvc?rev=1055538&view=rev
Log:
CASSANDRA-1710 basic connection pooling for java driver
Patch by eevans for CASSANDRA-1710
Added:
cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java
cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java
cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java
Modified:
cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java
Modified: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java?rev=1055538&r1=1055537&r2=1055538&view=diff
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java (original)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Connection.java Wed Jan 5 17:24:07 2011
@@ -1,33 +1,8 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
package org.apache.cassandra.cql.driver;
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.zip.Deflater;
-
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
@@ -37,105 +12,95 @@ import org.apache.thrift.protocol.TProto
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/** CQL connection object. */
public class Connection
{
- private static final Logger logger = LoggerFactory.getLogger(Connection.class);
+ public static Compression defaultCompression = Compression.GZIP;
+ public final String hostName;
+ public final int portNo;
- public String hostName;
- public int port;
+ private static final Logger logger = LoggerFactory.getLogger(Connection.class);
+ protected long timeOfLastFailure = 0;
+ protected int numFailures = 0;
private Cassandra.Client client;
private TTransport transport;
- private Compression defaultCompression = Compression.GZIP;
- public Connection(String keyspaceName, String...hosts) throws InvalidRequestException, TException
+ /**
+ * Create a new <code>Connection</code> instance.
+ *
+ * @param hostName hostname or IP address of the remote host
+ * @param portNo TCP port number
+ * @throws TTransportException if unable to connect
+ */
+ public Connection(String hostName, int portNo) throws TTransportException
{
- assert hosts.length > 0;
+ this.hostName = hostName;
+ this.portNo = portNo;
- for (String hostSpec : hosts)
- {
- String[] parts = hostSpec.split(":", 2);
- this.hostName = parts[0];
- this.port = Integer.parseInt(parts[1]);
-
- // TODO: This will need to do connection pooling.
- break;
- }
-
- TSocket socket = new TSocket(hostName, port);
+ TSocket socket = new TSocket(hostName, portNo);
transport = new TFramedTransport(socket);
TProtocol protocol = new TBinaryProtocol(transport);
client = new Cassandra.Client(protocol);
socket.open();
- client.set_keyspace(keyspaceName);
- }
-
- private ByteBuffer compressQuery(String queryStr, Compression compression)
- {
- byte[] data = queryStr.getBytes();
- Deflater compressor = new Deflater();
- compressor.setInput(data);
- compressor.finish();
-
- ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
-
- while (!compressor.finished())
- {
- int size = compressor.deflate(buffer);
- byteArray.write(buffer, 0, size);
- }
-
- logger.trace("Compressed query statement {} bytes in length to {} bytes",
- data.length,
- byteArray.size());
-
- return ByteBuffer.wrap(byteArray.toByteArray());
+ logger.info("Connected to {}:{}", hostName, portNo);
}
+ /**
+ * Execute a CQL query.
+ *
+ * @param queryStr a CQL query string
+ * @return the query results encoded as a CqlResult struct
+ * @throws InvalidRequestException on poorly constructed or illegal requests
+ * @throws UnavailableException when not all required replicas could be created/read
+ * @throws TimedOutException when a cluster operation timed out
+ * @throws TException
+ */
public CqlResult execute(String queryStr)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- return execute(queryStr, getDefaultCompression());
+ return execute(queryStr, defaultCompression);
}
- public CqlResult execute(String queryStr, Compression compression)
+ /**
+ * Execute a CQL query.
+ *
+ * @param queryStr a CQL query string
+ * @param compress query compression to use
+ * @return the query results encoded as a CqlResult struct
+ * @throws InvalidRequestException on poorly constructed or illegal requests
+ * @throws UnavailableException when not all required replicas could be created/read
+ * @throws TimedOutException when a cluster operation timed out
+ * @throws TException
+ */
+ public CqlResult execute(String queryStr, Compression compress)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- logger.trace("Executing CQL Query: {}", queryStr);
- return client.execute_cql_query(compressQuery(queryStr, compression), compression);
+ try
+ {
+ return client.execute_cql_query(Utils.compressQuery(queryStr, compress), compress);
+ }
+ catch (TException error)
+ {
+ numFailures++;
+ timeOfLastFailure = System.currentTimeMillis();
+ throw error;
+ }
}
- public Compression getDefaultCompression()
+ /** Shutdown the remote connection */
+ public void close()
{
- return defaultCompression;
+ transport.close();
}
-
- public void setDefaultCompression(Compression defaultCompression)
- {
- this.defaultCompression = defaultCompression;
- }
-
- public static void main(String[] args) throws Exception
+
+ /** Connection state. */
+ public boolean isOpen()
{
- Connection conn = new Connection("Keyspace1", "localhost:9160");
- CqlResult result = conn.execute("UPDATE Standard2 USING CONSISTENCY.ONE WITH ROW(\"mango\", COL(\"disposition\", \"fruit\"));");
- String selectQ = "SELECT FROM Standard2 WHERE KEY > \"apple\" AND KEY < \"carrot\" ROWLIMIT 5 DESC;";
- result = conn.execute(selectQ);
- switch (result.type)
- {
- case ROWS:
- for (CqlRow row : result.rows)
- {
- System.out.println("KEY: " + new String(row.key.array()));
- for (org.apache.cassandra.thrift.Column col : row.columns)
- {
- System.out.println(" COL: " + new String(col.name.array()) + ":" + new String(col.value.array()));
- }
- }
- }
+ return transport.isOpen();
}
}
Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/ConnectionPool.java Wed Jan 5 17:24:07 2011
@@ -0,0 +1,156 @@
+
+package org.apache.cassandra.cql.driver;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple connection-caching pool implementation.
+ *
+ * <p>A <code>ConnectionPool</code> provides the simplest possible connection
+ * pooling, lazily creating new connections if needed as
+ * <code>borrowClient</code> is called. Connections are re-added to the pool
+ * by <code>returnConnection</code>, unless doing so would exceed the maximum
+ * pool size.</p>
+ *
+ * <p>Example usage:</p>
+ *
+ * <code>
+ * IConnectionPool pool = new ConnectionPool("localhost", 9160);<br />
+ * Connection conn = pool.borrowConnection();<br />
+ * conn.execute(...);<br />
+ * pool.returnConnection(pool);<br />
+ * </code>
+ */
+public class ConnectionPool implements IConnectionPool
+{
+ public static final int DEFAULT_MAX_CONNECTIONS = 25;
+ public static final int DEFAULT_PORT = 9160;
+ public static final int DEFAULT_MAX_IDLE = 5;
+ public static final long DEFAULT_EVICTION_DELAY_MILLIS = 10000; // 10 seconds
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class);
+
+ private ConcurrentLinkedQueue<Connection> connections = new ConcurrentLinkedQueue<Connection>();
+ private Timer eviction;
+ private String hostName;
+ private int portNo;
+ private int maxConns, maxIdle;
+
+ /**
+ * Create a new <code>ConnectionPool</code> for a given hostname.
+ *
+ * @param hostName hostname or IP address to open connections to
+ * @throws TTransportException if unable to connect
+ */
+ public ConnectionPool(String hostName) throws TTransportException
+ {
+ this(hostName, DEFAULT_PORT, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_IDLE, DEFAULT_EVICTION_DELAY_MILLIS);
+ }
+
+ /**
+ * Create a new <code>ConnectionPool</code> for the given hostname and
+ * port number.
+ *
+ * @param hostName hostname or IP address to open connections to
+ * @param portNo port number to connect to
+ * @throws TTransportException if unable to connect
+ */
+ public ConnectionPool(String hostName, int portNo) throws TTransportException
+ {
+ this(hostName, portNo, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_IDLE, DEFAULT_EVICTION_DELAY_MILLIS);
+ }
+
+ /**
+ * Create a new <code>ConnectionPool</code>.
+ *
+ * @param hostName hostname or IP address to open connections to
+ * @param portNo portNo port number to connect to
+ * @param maxConns the maximum number of connections to save in the pool
+ * @param maxIdle the max number of connections allowed in the pool after an eviction
+ * @param evictionDelay the number milliseconds between eviction runs
+ * @throws TTransportException if unable to connect
+ */
+ public ConnectionPool(String hostName, int portNo, int maxConns, int maxIdle, long evictionDelay)
+ throws TTransportException
+ {
+ this.hostName = hostName;
+ this.portNo = portNo;
+ this.maxConns = maxConns;
+ this.maxIdle = maxIdle;
+
+ eviction = new Timer("EVICTION-THREAD", true);
+ eviction.schedule(new EvictionTask(), new Date(), evictionDelay);
+
+ connections.add(new Connection(hostName, portNo));
+ }
+
+ /**
+ * Check a <code>Connection</code> instance out from the pool, creating a
+ * new one if the pool is exhausted.
+ */
+ public Connection borrowConnection()
+ {
+ Connection conn = null;
+
+ if ((conn = connections.poll()) == null)
+ {
+ try
+ {
+ conn = new Connection(hostName, portNo);
+ }
+ catch (TTransportException error)
+ {
+ logger.error(String.format("Error connecting to %s:%s", hostName, portNo), error);
+ }
+ }
+
+ return conn;
+ }
+
+ /**
+ * Returns an <code>Connection</code> instance to the pool. If the pool
+ * already contains the maximum number of allowed connections, then the
+ * instance's <code>close</code> method is called and it is discarded.
+ */
+ public void returnConnection(Connection connection)
+ {
+ if (connections.size() >= maxConns)
+ {
+ if (connection.isOpen()) connection.close();
+ logger.warn("Max pool size reached; Connection discarded.");
+ return;
+ }
+
+ if (!connection.isOpen())
+ {
+ logger.warn("Stubbornly refusing to return a closed connection to the pool (discarded instead).");
+ return;
+ }
+
+ connections.add(connection);
+ }
+
+ private class EvictionTask extends TimerTask
+ {
+ public void run()
+ {
+ int count = 0;
+
+ while (connections.size() > maxIdle)
+ {
+ Connection conn = connections.poll();
+ if (conn.isOpen()) conn.close();
+ count++;
+ }
+
+ if (count > 0)
+ logger.debug("Eviction run complete: {} connections evicted.", count);
+ }
+ }
+}
Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/IConnectionPool.java Wed Jan 5 17:24:07 2011
@@ -0,0 +1,17 @@
+package org.apache.cassandra.cql.driver;
+
+public interface IConnectionPool
+{
+ /**
+ * Check a <code>Connection</code> instance out from the pool, creating a
+ * new one if the pool is exhausted.
+ */
+ public Connection borrowConnection();
+
+ /**
+ * Returns an <code>Connection</code> instance to the pool. If the pool
+ * already contains the maximum number of allowed connections, then the
+ * instance's <code>close</code> method is called and it is discarded.
+ */
+ public void returnConnection(Connection connection);
+}
Added: cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java?rev=1055538&view=auto
==============================================================================
--- cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java (added)
+++ cassandra/trunk/drivers/java/src/org/apache/cassandra/cql/driver/Utils.java Wed Jan 5 17:24:07 2011
@@ -0,0 +1,37 @@
+package org.apache.cassandra.cql.driver;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+
+import org.apache.cassandra.thrift.Compression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Utils
+{
+ private static final Logger logger = LoggerFactory.getLogger(Utils.class);
+
+ public static ByteBuffer compressQuery(String queryStr, Compression compression)
+ {
+ byte[] data = queryStr.getBytes();
+ Deflater compressor = new Deflater();
+ compressor.setInput(data);
+ compressor.finish();
+
+ ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+
+ while (!compressor.finished())
+ {
+ int size = compressor.deflate(buffer);
+ byteArray.write(buffer, 0, size);
+ }
+
+ logger.trace("Compressed query statement {} bytes in length to {} bytes",
+ data.length,
+ byteArray.size());
+
+ return ByteBuffer.wrap(byteArray.toByteArray());
+ }
+}