You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:07 UTC
[02/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/ConnectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/ConnectionManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/ConnectionManagerImpl.java
new file mode 100644
index 0000000..5be50ec
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -0,0 +1,1573 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.pooling;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.GatewayConfigurationException;
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.ConnectionFactory;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.EndpointManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.cache.client.internal.QueueConnectionImpl;
+import com.gemstone.gemfire.distributed.PoolCancelledException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import com.gemstone.org.jgroups.util.StringId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Manages client to server connections for the connection pool. This class contains
+ * all of the pooling logic to checkout/checkin connections.
+ * @author dsmith
+ *
+ * @since 5.7
+ *
+ */
+public class ConnectionManagerImpl implements ConnectionManager {
+ private static final Logger logger = LogService.getLogger();
+
+ static long AQUIRE_TIMEOUT = Long.getLong("gemfire.ConnectionManager.AQUIRE_TIMEOUT", 10 * 1000).longValue();
+ private final String poolName;
+ private final PoolStats poolStats;
+ protected final long prefillRetry; // ms // make this an int
+// private final long pingInterval; // ms // make this an int
+ private final LinkedList/*<PooledConnection>*/ availableConnections = new LinkedList/*<PooledConnection>*/();
+ protected final ConnectionMap allConnectionsMap = new ConnectionMap();
+ private final EndpointManager endpointManager;
+ private final int maxConnections;
+ protected final int minConnections;
+ private final long idleTimeout; // make this an int
+ protected final long idleTimeoutNanos;
+ final int lifetimeTimeout;
+ final long lifetimeTimeoutNanos;
+ private final InternalLogWriter securityLogWriter;
+ protected final CancelCriterion cancelCriterion;
+
+ protected volatile int connectionCount;
+ protected ScheduledExecutorService backgroundProcessor;
+ protected ScheduledThreadPoolExecutor loadConditioningProcessor;
+
+ protected ReentrantLock lock = new ReentrantLock();
+ protected Condition freeConnection = lock.newCondition();
+ private ConnectionFactory connectionFactory;
+ protected boolean haveIdleExpireConnectionsTask;
+ protected boolean havePrefillTask;
+ private boolean keepAlive=false;
+ protected volatile boolean shuttingDown;
+ private EndpointManager.EndpointListenerAdapter endpointListener;
+
+ private static final long NANOS_PER_MS = 1000000L;
+
+ /**
+ * Create a connection manager
+ * @param poolName the name of the pool that owns us
+ * @param factory the factory for new connections
+ * @param maxConnections The maximum number of connections that can exist
+ * @param minConnections The minimum number of connections that can exist
+ * @param idleTimeout The amount of time to wait to expire idle connections. -1 means that
+ * idle connections are never expired.
+ * @param lifetimeTimeout the lifetimeTimeout in ms.
+ * @param securityLogger
+ */
+ public ConnectionManagerImpl(String poolName,
+ ConnectionFactory factory,
+ EndpointManager endpointManager,
+ int maxConnections, int minConnections,
+ long idleTimeout, int lifetimeTimeout,
+ InternalLogWriter securityLogger, long pingInterval,
+ CancelCriterion cancelCriterion, PoolStats poolStats) {
+ this.poolName = poolName;
+ this.poolStats = poolStats;
+ if(maxConnections < minConnections && maxConnections != -1) {
+ throw new IllegalArgumentException("Max connections " + maxConnections + " is less than minConnections " + minConnections);
+ }
+ if(maxConnections <= 0 && maxConnections != -1) {
+ throw new IllegalArgumentException("Max connections " + maxConnections + " must be greater than 0");
+ }
+ if(minConnections < 0) {
+ throw new IllegalArgumentException("Min connections " + minConnections + " must be greater than or equals to 0");
+ }
+
+ this.connectionFactory = factory;
+ this.endpointManager = endpointManager;
+ this.maxConnections = maxConnections == -1 ? Integer.MAX_VALUE : maxConnections;
+ this.minConnections = minConnections;
+ this.lifetimeTimeout = lifetimeTimeout;
+ this.lifetimeTimeoutNanos = lifetimeTimeout * NANOS_PER_MS;
+ if (lifetimeTimeout != -1) {
+ if (idleTimeout > lifetimeTimeout || idleTimeout == -1) {
+ // lifetimeTimeout takes precedence over longer idle timeouts
+ idleTimeout = lifetimeTimeout;
+ }
+ }
+ this.idleTimeout = idleTimeout;
+ this.idleTimeoutNanos = this.idleTimeout * NANOS_PER_MS;
+ this.securityLogWriter = securityLogger;
+ this.prefillRetry = pingInterval;
+// this.pingInterval = pingInterval;
+ this.cancelCriterion = cancelCriterion;
+ this.endpointListener = new EndpointManager.EndpointListenerAdapter() {
+ @Override
+ public void endpointCrashed(Endpoint endpoint) {
+ invalidateServer(endpoint);
+ }
+ };
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager#borrowConnection(long)
+ */
+ public Connection borrowConnection(long acquireTimeout) throws AllConnectionsInUseException, NoAvailableServersException {
+
+ long startTime = System.currentTimeMillis();
+ long remainingTime = acquireTimeout;
+
+ //wait for a connection to become free
+ lock.lock();
+ try {
+ while(connectionCount >= maxConnections && availableConnections.isEmpty() &&remainingTime > 0 && !shuttingDown) {
+ final long start = getPoolStats().beginConnectionWait();
+ boolean interrupted = false;
+ try {
+ freeConnection.await(remainingTime, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ cancelCriterion.checkCancelInProgress(e);
+ throw new AllConnectionsInUseException();
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ getPoolStats().endConnectionWait(start);
+ }
+ remainingTime = acquireTimeout - (System.currentTimeMillis() - startTime);
+ }
+ if(shuttingDown) {
+ throw new PoolCancelledException();
+ }
+
+ while (!availableConnections.isEmpty()) {
+ PooledConnection connection = (PooledConnection) availableConnections.removeFirst();
+ try {
+ connection.activate();
+ return connection;
+ }
+ catch (ConnectionDestroyedException ex) {
+ // whoever destroyed it already decremented connectionCount
+ }
+ }
+ if (connectionCount >= maxConnections) {
+ throw new AllConnectionsInUseException();
+ }
+ else {
+ //We need to create a connection. Reserve space for it.
+ connectionCount++;
+// logger.info("DEBUG: borrowConnection conCount(+1)->" + connectionCount);
+// getPoolStats().incConCount(1);
+ }
+
+ }
+ finally {
+ lock.unlock();
+ }
+
+ PooledConnection connection = null;
+ try {
+ Connection plainConnection = connectionFactory.createClientToServerConnection(Collections.EMPTY_SET);
+
+ connection = addConnection(plainConnection);
+ }
+ catch(GemFireSecurityException e) {
+ throw new ServerOperationException(e);
+ }
+ catch(GatewayConfigurationException e) {
+ throw new ServerOperationException(e);
+ }
+ catch(ServerRefusedConnectionException srce) {
+ throw new NoAvailableServersException(srce);
+ }
+ finally {
+ //if we failed, release the space we reserved for our connection
+ if(connection == null) {
+ lock.lock();
+ try {
+// getPoolStats().incConCount(-1);
+ --connectionCount;
+// logger.info("DEBUG: borrowConnection conCount(-1)->" + connectionCount);
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ if(connection == null) {
+ this.cancelCriterion.checkCancelInProgress(null);
+ throw new NoAvailableServersException();
+ }
+
+ return connection;
+ }
+
+// public Connection borrowConnection(ServerLocation server, long acquireTimeout)
+// throws AllConnectionsInUseException, NoAvailableServersException {
+// return borrowConnection(server, acquireTimeout, false);
+// }
+
+// /**
+// * Used to tell a caller of borrowConnection that it did not find an existing connnection.
+// */
+// public static final Connection NO_EXISTING_CONNECTION = new ConnectionImpl(null, null);
+
+ /**
+ * Borrow a connection to a specific server. This task currently
+ * allows us to break the connection limit, because it is used by
+ * tasks from the background thread that shouldn't be constrained
+ * by the limit. They will only violate the limit by 1 connection, and
+ * that connection will be destroyed when returned to the pool.
+ */
+ public Connection borrowConnection(ServerLocation server, long acquireTimeout
+ , boolean onlyUseExistingCnx)
+ throws AllConnectionsInUseException, NoAvailableServersException {
+ lock.lock();
+ try {
+ if(shuttingDown) {
+ throw new PoolCancelledException();
+ }
+ for(Iterator itr = availableConnections.iterator(); itr.hasNext(); ) {
+ PooledConnection nextConnection = (PooledConnection) itr.next();
+ try {
+ nextConnection.activate();
+ if(nextConnection.getServer().equals(server)) {
+ itr.remove();
+ return nextConnection;
+ }
+ nextConnection.passivate(false);
+ } catch (ConnectionDestroyedException ex) {
+ // someone else already destroyed this connection so ignore it
+ // but remove it from availableConnections
+ }
+ //Fix for 41516. Before we let this method exceed the max connections
+ //by creating a new connection, we need to make sure that they're
+ //aren't bogus connections sitting in the available connection list
+ //otherwise, the length of that list might exceed max connections,
+ //but with some bad connections. That can cause members to
+ //get a bad connection but have no permits to create a new connection.
+ if(nextConnection.shouldDestroy()) {
+ itr.remove();
+ }
+ }
+
+ if (onlyUseExistingCnx) {
+ throw new AllConnectionsInUseException();
+ }
+
+ // We need to create a connection. Reserve space for it.
+ connectionCount++;
+// logger.info("DEBUG: borrowConnection conCount(+1)->" + connectionCount);
+// getPoolStats().incConCount(1);
+ } finally {
+ lock.unlock();
+ }
+
+ PooledConnection connection = null;
+ try {
+ Connection plainConnection = connectionFactory.createClientToServerConnection(server, false);
+ connection = addConnection(plainConnection);
+ } catch(GemFireSecurityException e) {
+ throw new ServerOperationException(e);
+ } finally {
+ //if we failed, release the space we reserved for our connection
+ if(connection == null) {
+ lock.lock();
+ try {
+// getPoolStats().incConCount(-1);
+ --connectionCount;
+// logger.info("DEBUG: borrowConnection conCount(-1)->" + connectionCount);
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ if(connection == null) {
+ throw new ServerConnectivityException("Could not create a new connection to server " + server);
+ }
+ return connection;
+ }
+
+ public Connection exchangeConnection(Connection oldConnection,
+ Set/* <ServerLocation> */excludedServers, long acquireTimeout)
+ throws AllConnectionsInUseException {
+ assert oldConnection instanceof PooledConnection;
+ PooledConnection newConnection = null;
+ PooledConnection oldPC = (PooledConnection) oldConnection;
+// while (!allConnectionsMap.containsConnection(oldPC)) {
+// // ok the connection has already been removed so we really can't do an
+// // exchange yet.
+// // As a quick hack lets just get a connection using borrow.
+// // If it turns out to be in our excludedServer set then
+// // we can loop and try to exchange it.
+// // But first make sure oldPC's socket gets closed.
+// oldPC.internalDestroy();
+// newConnection = (PooledConnection)borrowConnection(acquireTimeout);
+// if (excludedServers.contains(newConnection.getServer())) {
+// oldPC = newConnection; // loop and try to exchange it
+// newConnection = null;
+// } else {
+// // we found one so we can just return it
+// return newConnection;
+// }
+// }
+
+ boolean needToUndoEstimate = false;
+ lock.lock();
+ try {
+ if(shuttingDown) {
+ throw new PoolCancelledException();
+ }
+ for(Iterator itr = availableConnections.iterator(); itr.hasNext(); ) {
+ PooledConnection nextConnection = (PooledConnection) itr.next();
+ if(!excludedServers.contains(nextConnection.getServer())) {
+ itr.remove();
+ try {
+ nextConnection.activate();
+ newConnection = nextConnection;
+// logger.info("DEBUG: exchangeConnection removeCon(" + oldPC +")");
+ if (allConnectionsMap.removeConnection(oldPC)) {
+// getPoolStats().incConCount(-1);
+ --connectionCount;
+// logger.info("DEBUG: exchangeConnection conCount(-1)->" + connectionCount + " oldPC=" + oldPC);
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ }
+ break;
+ }
+ catch (ConnectionDestroyedException ex) {
+ // someone else already destroyed this connection so ignore it
+ // but remove it from availableConnections
+ }
+ }
+ }
+ if (newConnection == null) {
+ if (!allConnectionsMap.removeConnection(oldPC)) {
+ // need to reserve space for the following create
+// if (connectionCount >= maxConnections) {
+// throw new AllConnectionsInUseException();
+// } else {
+ // WARNING: we may be going over maxConnections here
+ // @todo grid: this needs to be fixed
+ //We need to create a connection. Reserve space for it.
+ needToUndoEstimate = true;
+ connectionCount++;
+// logger.info("DEBUG: exchangeConnection conCount(+1)->" + connectionCount);
+// getPoolStats().incConCount(1);
+// }
+ }
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+
+ if(newConnection == null) {
+ try {
+ Connection plainConnection = connectionFactory.createClientToServerConnection(excludedServers);
+ newConnection = addConnection(plainConnection);
+// logger.info("DEBUG: exchangeConnection newConnection=" + newConnection);
+ }
+ catch(GemFireSecurityException e) {
+ throw new ServerOperationException(e);
+ }
+ catch(ServerRefusedConnectionException srce) {
+ throw new NoAvailableServersException(srce);
+ }
+ finally {
+ if (needToUndoEstimate && newConnection == null) {
+ lock.lock();
+ try {
+// getPoolStats().incConCount(-1);
+ --connectionCount;
+// logger.info("DEBUG: exchangeConnection conCount(-1)->" + connectionCount);
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ if(newConnection == null) {
+ throw new NoAvailableServersException();
+ }
+
+// logger.info("DEBUG: exchangeConnection internalDestroy(" + oldPC +")");
+ oldPC.internalDestroy();
+
+ return newConnection;
+ }
+
+ protected/*GemStoneAddition*/ String getPoolName() {
+ return this.poolName;
+ }
+
+ private PooledConnection addConnection(Connection conn) {
+
+ if(conn == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to create a connection in the allowed time");
+ }
+ return null;
+ }
+ PooledConnection pooledConn= new PooledConnection(this, conn);
+ allConnectionsMap.addConnection(pooledConn);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Created a new connection. {} Connection count is now {}", pooledConn, connectionCount);
+ }
+ return pooledConn;
+ }
+
+ private void destroyConnection(PooledConnection connection) {
+ lock.lock();
+ try {
+ if (allConnectionsMap.removeConnection(connection)) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Invalidating connection {} connection count is now {}", connection, connectionCount);
+ }
+
+// getPoolStats().incConCount(-1);
+// logger.info("DEBUG: destroyConnection conCount(-1)->" + connectionCount);
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ freeConnection.signalAll();
+ }
+ --connectionCount; // fix for bug #50333
+ }
+ finally {
+ lock.unlock();
+ }
+
+ connection.internalDestroy();
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager#invalidateServer(com.gemstone.gemfire.distributed.internal.ServerLocation)
+ */
+ protected void invalidateServer(Endpoint endpoint) {
+ Set badConnections = allConnectionsMap.removeEndpoint(endpoint);
+ if(badConnections == null) {
+ return;
+ }
+
+ lock.lock();
+ try {
+ if(shuttingDown) {
+ return;
+ }
+ if(logger.isDebugEnabled()) {
+ logger.debug("Invalidating {} connections to server {}", badConnections.size(), endpoint);
+ }
+
+ //mark connections for destruction now, so if anyone tries
+ //to return a connection they'll get an exception
+ for(Iterator itr = badConnections.iterator(); itr.hasNext(); ) {
+ PooledConnection conn = (PooledConnection) itr.next();
+ if (!conn.setShouldDestroy()) {
+ // this might not be true; they make have just had an exception
+// itr.remove(); // someone else is destroying it
+ }
+ }
+
+ for(Iterator itr = availableConnections.iterator(); itr.hasNext(); ) {
+ PooledConnection conn = (PooledConnection) itr.next();
+ if(badConnections.contains(conn)) {
+ itr.remove();
+ }
+ }
+
+// getPoolStats().incConCount(-badConnections.size());
+ connectionCount -= badConnections.size();
+// logger.info("DEBUG: invalidateServer conCount(" + (-badConnections.size()) + ")->" + connectionCount);
+
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+
+ // TODO (ashetkar) This for loop may well be outside the lock. But this
+ // change was tested thoroughly for #42185 and also it may not impact perf
+ // because this method gets called only when a server goes down.
+ for(Iterator itr = badConnections.iterator(); itr.hasNext(); ) {
+ PooledConnection conn = (PooledConnection) itr.next();
+ conn.internalDestroy();
+ }
+
+ if(connectionCount < maxConnections) {
+ freeConnection.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager#returnConnection(com.gemstone.gemfire.cache.client.internal.Connection)
+ */
+ public void returnConnection(Connection connection) {
+ returnConnection(connection, true);
+ }
+
+ public void returnConnection(Connection connection, boolean accessed) {
+
+ assert connection instanceof PooledConnection;
+ PooledConnection pooledConn = (PooledConnection)connection;
+
+ boolean shouldClose = false;
+
+ lock.lock();
+ try {
+ if (pooledConn.isDestroyed()) {
+ return;
+ }
+
+ if (pooledConn.shouldDestroy()) {
+ destroyConnection(pooledConn);
+ } else {
+ // thread local connections are already passive at this point
+ if (pooledConn.isActive()) {
+ pooledConn.passivate(accessed);
+ }
+
+ // borrowConnection(ServerLocation, long) allows us to break the
+ // connection limit in order to get a connection to a server. So we need
+ // to get our pool back to size if we're above the limit
+ if (connectionCount > maxConnections) {
+ if (allConnectionsMap.removeConnection(pooledConn)) {
+ shouldClose = true;
+ // getPoolStats().incConCount(-1);
+ --connectionCount;
+ // logger.info("DEBUG: returnConnection conCount(-1)->" + connectionCount);
+ }
+ } else {
+ availableConnections.addFirst(pooledConn);
+ freeConnection.signalAll();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ if (shouldClose) {
+ try {
+ PoolImpl localpool=(PoolImpl)PoolManagerImpl.getPMI().find(poolName);
+ Boolean durable=false;
+ if(localpool!=null){
+ durable=localpool.isDurableClient();
+ }
+ pooledConn.internalClose(durable||this.keepAlive);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ConnectionManagerImpl_ERROR_CLOSING_CONNECTION_0,
+ pooledConn), e);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ */
+ public void start(ScheduledExecutorService backgroundProcessor) {
+ this.backgroundProcessor = backgroundProcessor;
+ this.loadConditioningProcessor = new ScheduledThreadPoolExecutor(1/*why not 0?*/, new ThreadFactory() {
+ public Thread newThread(final Runnable r) {
+ Thread result = new Thread(r, "poolLoadConditioningMonitor-" + getPoolName());
+ result.setDaemon(true);
+ return result;
+ }
+ });
+ this.loadConditioningProcessor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+ endpointManager.addListener(endpointListener);
+
+ lock.lock();
+ try {
+ startBackgroundPrefill();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager#close(boolean, long)
+ */
+ public void close(boolean keepAlive) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Shutting down connection manager with keepAlive {}", keepAlive);
+ }
+ this.keepAlive=keepAlive;
+ endpointManager.removeListener(endpointListener);
+
+ lock.lock();
+ try {
+ if(shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ }
+ finally {
+ lock.unlock();
+ }
+
+ // do this early as it might help lifetimeProcessor shutdown
+// closeReplacementConnection();
+ try {
+ if (this.loadConditioningProcessor != null) {
+ this.loadConditioningProcessor.shutdown();
+ if(!this.loadConditioningProcessor.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_TIMEOUT_WAITING_FOR_LOAD_CONDITIONING_TASKS_TO_COMPLETE));
+ }
+ }
+ }
+ catch (RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_ERROR_STOPPING_LOADCONDITIONINGPROCESSOR), e);
+ }
+ catch (InterruptedException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_INTERRUPTED_STOPPING_LOADCONDITIONINGPROCESSOR), e);
+ }
+ // one more time in case of race with lifetimeProcessor
+// closeReplacementConnection();
+ allConnectionsMap.close(keepAlive);
+ }
+
+ public void emergencyClose() {
+ shuttingDown = true;
+ if (this.loadConditioningProcessor != null) {
+ this.loadConditioningProcessor.shutdown();
+ }
+// closeReplacementConnection();
+ allConnectionsMap.emergencyClose();
+ }
+
+ protected void startBackgroundExpiration() {
+ if (idleTimeout >= 0) {
+ synchronized (this.allConnectionsMap) {
+ if(!haveIdleExpireConnectionsTask) {
+ haveIdleExpireConnectionsTask = true;
+ try {
+ backgroundProcessor.schedule(new IdleExpireConnectionsTask(), idleTimeout, TimeUnit.MILLISECONDS);
+ }
+ catch (RejectedExecutionException e) {
+ // ignore, the timer has been cancelled, which means we're shutting
+ // down.
+ }
+ }
+ }
+ }
+ }
+
+ /** Always called with lock held */
+ protected void startBackgroundPrefill() {
+ if(!havePrefillTask) {
+ havePrefillTask = true;
+ try {
+ backgroundProcessor.execute(new PrefillConnectionsTask());
+ }
+ catch (RejectedExecutionException e) {
+ // ignore, the timer has been cancelled, which means we're shutting
+ // down.
+ }
+ }
+ }
+
+ protected boolean prefill() {
+ try {
+ while (connectionCount < minConnections) {
+ if (cancelCriterion.cancelInProgress() != null) {
+ return true;
+ }
+ boolean createdConnection= prefillConnection();
+ if (!createdConnection) {
+ return false;
+ }
+ }
+ }
+ catch(Throwable t) {
+ cancelCriterion.checkCancelInProgress(t);
+ if(t.getCause()!=null) {
+ t = t.getCause();
+ }
+ logInfo(LocalizedStrings.ConnectionManagerImpl_ERROR_PREFILLING_CONNECTIONS, t);
+ return false;
+ }
+
+ return true;
+ }
+
+ public int getConnectionCount() {
+ return this.connectionCount;
+ }
+
+ protected PoolStats getPoolStats() {
+ return this.poolStats;
+ }
+
+ public Connection getConnection(Connection conn) {
+ if (conn instanceof PooledConnection) {
+ return ((PooledConnection)conn).getConnection();
+ } else if (conn instanceof QueueConnectionImpl) {
+ return ((QueueConnectionImpl)conn).getConnection();
+ } else {
+ return conn;
+ }
+ }
+
+ private boolean prefillConnection() {
+ boolean createConnection = false;
+ lock.lock();
+ try {
+ if (shuttingDown) {
+ return false;
+ }
+ if (connectionCount < minConnections) {
+// getPoolStats().incConCount(1);
+ connectionCount++;
+// logger.info("DEBUG: prefillConnection conCount(+1)->" + connectionCount);
+ createConnection = true;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+
+ if (createConnection) {
+ PooledConnection connection= null;
+ try {
+ Connection plainConnection = connectionFactory.createClientToServerConnection(Collections.EMPTY_SET);
+ if(plainConnection == null) {
+ return false;
+ }
+ connection = addConnection(plainConnection);
+ connection.passivate(false);
+ getPoolStats().incPrefillConnect();
+ }
+ catch (ServerConnectivityException ex) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_UNABLE_TO_PREFILL_POOL_TO_MINIMUM_BECAUSE_0, ex), null);
+ return false;
+ }
+ finally {
+ lock.lock();
+ try {
+ if(connection == null) {
+// getPoolStats().incConCount(-1);
+ connectionCount--;
+// logger.info("DEBUG: prefillConnection conCount(-1)->" + connectionCount);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Unable to prefill pool to minimum, connection count is now {}", connectionCount);
+ }
+ }
+ else {
+ availableConnections.addFirst(connection);
+ freeConnection.signalAll();
+ if(logger.isDebugEnabled()) {
+ logger.debug("Prefilled connection {} connection count is now {}", connection, connectionCount);
+ }
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public static void loadEmergencyClasses() {
+ PooledConnection.loadEmergencyClasses();
+ }
+
+ protected class LifetimeExpireConnectionsTask implements Runnable {
+ public void run() {
+ try {
+// logger.info("DEBUG: lifetimeTask=" + this);
+ allConnectionsMap.checkLifetimes();
+ }
+ catch (CancelException ignore) {
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ // NOTREACHED
+ throw e; // for safety
+ }
+ catch (Throwable t) {
+ SystemFailure.checkFailure();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_LOADCONDITIONINGTASK_0_ENCOUNTERED_EXCEPTION, this), t);
+ // Don't rethrow, it will just get eaten and kill the timer
+ }
+ }
+ }
+
+ protected class IdleExpireConnectionsTask implements Runnable {
+ public void run() {
+ try {
+ getPoolStats().incIdleCheck();
+ allConnectionsMap.checkIdleExpiration();
+ }
+ catch (CancelException ignore) {
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ // NOTREACHED
+ throw e; // for safety
+ }
+ catch (Throwable t) {
+ SystemFailure.checkFailure();
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_IDLEEXPIRECONNECTIONSTASK_0_ENCOUNTERED_EXCEPTION, this), t);
+ // Don't rethrow, it will just get eaten and kill the timer
+ }
+ }
+ }
+
+ protected class PrefillConnectionsTask extends PoolTask {
+
+ @Override
+ public void run2() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Prefill Connections task running");
+ }
+
+ prefill();
+ lock.lock();
+ try {
+ if(connectionCount < minConnections && cancelCriterion.cancelInProgress() == null) {
+ try {
+ backgroundProcessor.schedule(new PrefillConnectionsTask(), prefillRetry, TimeUnit.MILLISECONDS);
+ } catch(RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+ }
+ else {
+ havePrefillTask = false;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+// private final AR/*<ReplacementConnection>*/ replacement = CFactory.createAR();
+
+// private void closeReplacementConnection() {
+// ReplacementConnection rc = (ReplacementConnection)this.replacement.getAndSet(null);
+// if (rc != null) {
+// rc.getConnection().destroy();
+// }
+// }
+
+ /**
+ * Offer the replacement "con" to any cnx currently connected to "currentServer".
+ * @return true if someone takes our offer; false if not
+ */
+ private boolean offerReplacementConnection(Connection con, ServerLocation currentServer) {
+ boolean retry;
+ do {
+ retry = false;
+ PooledConnection target = this.allConnectionsMap.findReplacementTarget(currentServer);
+ if (target != null) {
+ final Endpoint targetEP = target.getEndpoint();
+ boolean interrupted = false;
+ try {
+ if (target.switchConnection(con)) {
+ getPoolStats().incLoadConditioningDisconnect();
+ this.allConnectionsMap.addReplacedCnx(target, targetEP);
+ return true;
+ }
+ else {
+// // target was destroyed; we have already removed it from
+// // allConnectionsMap but didn't dec the stat
+// getPoolStats().incPoolConnections(-1);
+// logger.info("DEBUG: offerReplacementConnection incPoolConnections(-1)->" + getPoolStats().getPoolConnections());
+ retry = true;
+ }
+ }
+ catch (InterruptedException e) {
+ // thrown by switchConnection
+ interrupted = true;
+ cancelCriterion.checkCancelInProgress(e);
+ retry = false;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } while (retry);
+ getPoolStats().incLoadConditioningReplaceTimeouts();
+ con.destroy();
+ return false;
+ }
+
+ /**
+ * An existing connections lifetime has expired.
+ * We only want to create one replacement connection at a time
+ * so this guy should block until this connection replaces an existing one.
+ * Note that if a connection is created here it must not count against
+ * the pool max and its idle time and lifetime must not begin until
+ * it actually replaces the existing one.
+ * @param currentServer the server the candidate connection is connected to
+ * @param idlePossible true if we have more cnxs than minPoolSize
+ * @return true if caller should recheck for expired lifetimes;
+ * false if a background check was scheduled or no expirations are possible.
+ */
+ public boolean createLifetimeReplacementConnection(ServerLocation currentServer,
+ boolean idlePossible) {
+ HashSet excludedServers = new HashSet();
+ ServerLocation sl = this.connectionFactory.findBestServer(currentServer, excludedServers);
+
+// boolean replacementConsumed = false;
+ while (sl != null) {
+ if (sl.equals(currentServer)) {
+ this.allConnectionsMap.extendLifeOfCnxToServer(currentServer);
+ break;
+ }
+ else {
+ if (!this.allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
+ break;
+ }
+ Connection con = null;
+ try {
+ // logger.fine("DEBUG: creating replacement connection to " + sl);
+ con = this.connectionFactory.createClientToServerConnection(sl, false);
+ // logger.fine("DEBUG: created replacement connection: " + con);
+ }
+ catch (GemFireSecurityException e) {
+ securityLogWriter.warning(
+ LocalizedStrings.ConnectionManagerImpl_SECURITY_EXCEPTION_CONNECTING_TO_SERVER_0_1,
+ new Object[] {sl, e});
+ }
+ catch (ServerRefusedConnectionException srce) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ConnectionManagerImpl_SERVER_0_REFUSED_NEW_CONNECTION_1,
+ new Object[] {sl, srce}));
+ }
+ if (con == null) {
+ excludedServers.add(sl);
+ sl = this.connectionFactory.findBestServer(currentServer, excludedServers);
+ }
+ else {
+ getPoolStats().incLoadConditioningConnect();
+ if (!this.allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
+ getPoolStats().incLoadConditioningReplaceTimeouts();
+ con.destroy();
+ break;
+ }
+ offerReplacementConnection(con, currentServer);
+ break;
+ }
+ }
+ }
+ if (sl == null) {
+ // we didn't find a server to create a replacement cnx on so
+ // extends the currentServers life
+ this.allConnectionsMap.extendLifeOfCnxToServer(currentServer);
+ }
+ return this.allConnectionsMap.checkForReschedule(true);
+ }
+
+ protected class ConnectionMap {
+ private final HashMap/*<Endpoint, HashSet<PooledConnection>*/ map = new HashMap();
+ private final LinkedList/*<PooledConnection>*/ allConnections = new LinkedList/*<PooledConnection>*/(); // in the order they were created
+ private boolean haveLifetimeExpireConnectionsTask;
+
+ public synchronized boolean isIdleExpirePossible() {
+ return this.allConnections.size() > minConnections;
+ }
+
+ @Override
+ public synchronized String toString() {
+ final long now = System.nanoTime();
+ StringBuffer sb = new StringBuffer();
+ sb.append("<");
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ PooledConnection pc = (PooledConnection)it.next();
+ sb.append(pc.getServer());
+ if (pc.shouldDestroy()) {
+ sb.append("-DESTROYED");
+ }
+ else if ( pc.hasIdleExpired(now, idleTimeoutNanos) ) {
+ sb.append("-IDLE");
+ }
+ else if ( pc.remainingLife(now, lifetimeTimeoutNanos) <= 0 ) {
+ sb.append("-EOL");
+ }
+ if (it.hasNext()) {
+ sb.append(",");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+
+ public synchronized void addConnection(PooledConnection connection) {
+ addToEndpointMap(connection);
+
+ // we want the smallest birthDate (e.g. oldest cnx) at the front of the list
+ getPoolStats().incPoolConnections(1);
+// logger.info("DEBUG: addConnection incPoolConnections(1)->" + getPoolStats().getPoolConnections() + " con="+connection,
+// new RuntimeException("STACK"));
+ this.allConnections.addLast(connection);
+ if (isIdleExpirePossible()) {
+ startBackgroundExpiration();
+ }
+ if (lifetimeTimeout != -1 && !haveLifetimeExpireConnectionsTask) {
+ if (checkForReschedule(true)) {
+ // something has already expired so start processing with no delay
+// logger.info("DEBUG: rescheduling lifetime expire to be now");
+ startBackgroundLifetimeExpiration(0);
+ }
+ else {
+ // either no possible lifetime expires or we scheduled one
+ }
+ }
+ }
+
+ public synchronized void addReplacedCnx(PooledConnection con, Endpoint oldEndpoint) {
+ if (this.allConnections.remove(con)) {
+ // otherwise someone else has removed it and closed it
+ removeFromEndpointMap(oldEndpoint, con);
+ addToEndpointMap(con);
+ this.allConnections.addLast(con);
+ if (isIdleExpirePossible()) {
+ startBackgroundExpiration();
+ }
+ }
+ }
+
+ public synchronized Set removeEndpoint(Endpoint endpoint) {
+ final Set endpointConnections = (Set) this.map.remove(endpoint);
+ if(endpointConnections != null) {
+ int count = 0;
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ if (endpointConnections.contains(it.next())) {
+ count++;
+ it.remove();
+ }
+ }
+ if (count != 0) {
+ getPoolStats().incPoolConnections(-count);
+// logger.info("DEBUG: removedEndpoint incPoolConnections(" + (-count) + ")->" + getPoolStats().getPoolConnections() + " cons.size=" + endpointConnections.size() + " cons=" + endpointConnections);
+ }
+ }
+ return endpointConnections;
+ }
+
+ public synchronized boolean containsConnection(PooledConnection connection) {
+ return this.allConnections.contains(connection);
+ }
+
+ public synchronized boolean removeConnection(PooledConnection connection) {
+ // @todo darrel: allConnections.remove could be optimized by making
+ // allConnections a linkedHashSet
+ boolean result = this.allConnections.remove(connection);
+ if (result) {
+ getPoolStats().incPoolConnections(-1);
+// logger.info("DEBUG: removedConnection incPoolConnections(-1)->" + getPoolStats().getPoolConnections() + " con="+connection);
+ }
+
+ removeFromEndpointMap(connection);
+ return result;
+ }
+
+ private synchronized void addToEndpointMap(PooledConnection connection) {
+ Set endpointConnections = (Set) map.get(connection.getEndpoint());
+ if(endpointConnections == null) {
+ endpointConnections = new HashSet();
+ map.put(connection.getEndpoint(), endpointConnections);
+ }
+ endpointConnections.add(connection);
+ }
+
+ private void removeFromEndpointMap(PooledConnection connection) {
+ removeFromEndpointMap(connection.getEndpoint(), connection);
+ }
+
+ private synchronized void removeFromEndpointMap(Endpoint endpoint, PooledConnection connection) {
+ Set endpointConnections = (Set) this.map.get(endpoint);
+ if (endpointConnections != null) {
+ endpointConnections.remove(connection);
+ if(endpointConnections.size() == 0) {
+ this.map.remove(endpoint);
+ }
+ }
+ }
+
+ public synchronized void close(boolean keepAlive) {
+ map.clear();
+ int count = 0;
+ while (!this.allConnections.isEmpty()) {
+ PooledConnection pc = (PooledConnection)this.allConnections.removeFirst();
+ count++;
+ if (!pc.isDestroyed()) {
+ try {
+ pc.internalClose(keepAlive);
+ } catch(Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.ConnectionManagerImpl_ERROR_CLOSING_CONNECTION_TO_SERVER_0,
+ pc.getServer()), e);
+ }
+ }
+ }
+ if (count != 0) {
+ getPoolStats().incPoolConnections(-count);
+// logger.info("DEBUG: close incPoolConnections(" + (-count) + ")->" + getPoolStats().getPoolConnections());
+ }
+ }
+
+ public synchronized void emergencyClose() {
+ map.clear();
+ while (!this.allConnections.isEmpty()) {
+ PooledConnection pc = (PooledConnection)this.allConnections.removeFirst();
+ pc.emergencyClose();
+ }
+ }
+
+ /**
+ * Returns a pooled connection that can have its underlying cnx
+ * to currentServer replaced by a new connection.
+ * @return null if a target could not be found
+ */
+ public synchronized PooledConnection findReplacementTarget(ServerLocation currentServer) {
+ final long now = System.nanoTime();
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (currentServer.equals(pc.getServer())) {
+ if (!pc.shouldDestroy()
+ && pc.remainingLife(now, lifetimeTimeoutNanos) <= 0) {
+ removeFromEndpointMap(pc);
+ return pc;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return true if we have a connection to the currentServer whose
+ * lifetime has expired.
+ * Otherwise return false;
+ */
+ public synchronized boolean hasExpiredCnxToServer(ServerLocation currentServer) {
+ if (!this.allConnections.isEmpty()) {
+ //boolean idlePossible = isIdleExpirePossible();
+ final long now = System.nanoTime();
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (pc.shouldDestroy()) {
+ // this con has already been destroyed so ignore it
+ continue;
+ }
+ else if ( currentServer.equals(pc.getServer()) ) {
+ /*if (idlePossible && pc.hasIdleExpired(now, idleTimeoutNanos)) {
+ // this con has already idle expired so ignore it
+ continue;
+ } else*/ {
+ long life = pc.remainingLife(now, lifetimeTimeoutNanos);
+ if (life <= 0) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
+ /**
+ * Returns true if caller should recheck for expired lifetimes
+ * Returns false if a background check was scheduled or no expirations are possible.
+ */
+ public synchronized boolean checkForReschedule(boolean rescheduleOk) {
+ if (!this.allConnections.isEmpty()) {
+ final long now = System.nanoTime();
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (pc.hasIdleExpired(now, idleTimeoutNanos)) {
+ // this con has already idle expired so ignore it
+ continue;
+ }
+ else if ( pc.shouldDestroy() ) {
+ // this con has already been destroyed so ignore it
+ continue;
+ }
+ else {
+ long life = pc.remainingLife(now, lifetimeTimeoutNanos);
+ if (life > 0) {
+ if (rescheduleOk) {
+// logger.info("DEBUG: 2 rescheduling lifetime expire to be in: "
+// + life + " nanos");
+ startBackgroundLifetimeExpiration(life);
+ return false;
+ }
+ else {
+ return false;
+ }
+ }
+ else {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * See if any of the expired connections (that have not idle expired)
+ * are already connected to this sl and have not idle expired.
+ * If so then just update them in-place to simulate a replace.
+ * @param sl the location of the server we should see if we are connected to
+ * @return true if we were able to extend an existing connection's lifetime
+ * or if we have no connection's whose lifetime has expired.
+ * false if we need to create a replacement connection.
+ */
+ public synchronized boolean tryToExtendLifeTime(ServerLocation sl) {
+ // a better approach might be to get the most loaded server
+ // (if they are not balanced) and then scan through and extend the lifetime
+ // of everyone not connected to that server and do a replace on just one
+ // of the guys who has lifetime expired to the most loaded server
+ boolean result = true;
+ if (!this.allConnections.isEmpty()) {
+ final long now = System.nanoTime();
+ for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (pc.remainingLife(now, lifetimeTimeoutNanos) > 0) {
+ // no more connections whose lifetime could have expired
+ break;
+ // note don't ignore idle guys because they are still connected
+// } else if (pc.remainingIdle(now, idleTimeoutNanos) <= 0) {
+// // this con has already idle expired so ignore it
+ }
+ else if ( pc.shouldDestroy() ) {
+ // this con has already been destroyed so ignore it
+ }
+ else if ( sl.equals(pc.getEndpoint().getLocation()) ) {
+ // we found a guy to whose lifetime we can extend
+ it.remove();
+// logger.fine("DEBUG: tryToExtendLifeTime extending life of: " + pc);
+ pc.setBirthDate(now);
+ getPoolStats().incLoadConditioningExtensions();
+ this.allConnections.addLast(pc);
+ return true;
+ }
+ else {
+ // the current pc is a candidate for reconnection to another server
+ // so set result to false which will stick unless we find another con
+ // whose life can be extended.
+ result = false;
+ }
+ }
+ }
+// if (result) {
+// logger.fine("DEBUG: tryToExtendLifeTime found no one to extend");
+// }
+ return result;
+ }
+
+ /**
+ * Extend the life of the first expired connection to sl.
+ */
+ public synchronized void extendLifeOfCnxToServer(ServerLocation sl) {
+ if (!this.allConnections.isEmpty()) {
+ final long now = System.nanoTime();
+ for (Iterator it = this.allConnections.iterator(); it.hasNext() ;) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (pc.remainingLife(now, lifetimeTimeoutNanos) > 0) {
+ // no more connections whose lifetime could have expired
+ break;
+ // note don't ignore idle guys because they are still connected
+// } else if (pc.remainingIdle(now, idleTimeoutNanos) <= 0) {
+// // this con has already idle expired so ignore it
+ }
+ else if ( pc.shouldDestroy() ) {
+ // this con has already been destroyed so ignore it
+ }
+ else if ( sl.equals(pc.getEndpoint().getLocation()) ) {
+ // we found a guy to whose lifetime we can extend
+ it.remove();
+// logger.fine("DEBUG: tryToExtendLifeTime extending life of: " + pc);
+ pc.setBirthDate(now);
+ getPoolStats().incLoadConditioningExtensions();
+ this.allConnections.addLast(pc);
+ // break so we only do this to the oldest guy
+ break;
+ }
+ }
+ }
+ }
+
+ public synchronized void startBackgroundLifetimeExpiration(long delay) {
+ if(!this.haveLifetimeExpireConnectionsTask) {
+ this.haveLifetimeExpireConnectionsTask = true;
+ try {
+// logger.info("DEBUG: scheduling lifetime expire check in: " + delay + " ns");
+ LifetimeExpireConnectionsTask task = new LifetimeExpireConnectionsTask();
+// logger.info("DEBUG: scheduling lifetimeTask=" + task);
+ loadConditioningProcessor.schedule(task, delay, TimeUnit.NANOSECONDS);
+ }
+ catch (RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+ }
+ }
+
+ public void checkIdleExpiration() {
+ int expireCount = 0;
+ List<PooledConnection> toClose = null;
+ synchronized (this) {
+ haveIdleExpireConnectionsTask = false;
+ if(shuttingDown) {
+ return;
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Looking for connections to expire");
+ }
+
+ // because we expire thread local connections we need to scan allConnections
+
+ //find connections which have idle expired
+ int conCount = this.allConnections.size();
+ if (conCount <= minConnections) {
+ return;
+ }
+ final long now = System.nanoTime();
+ long minRemainingIdle = Long.MAX_VALUE;
+ toClose = new ArrayList<PooledConnection>(conCount-minConnections);
+ for (Iterator it = this.allConnections.iterator();
+ it.hasNext() && conCount > minConnections;) {
+ PooledConnection pc = (PooledConnection)it.next();
+ if (pc.shouldDestroy()) {
+ // ignore these connections
+ conCount--;
+ }
+ else {
+ long remainingIdle = pc.doIdleTimeout(now, idleTimeoutNanos);
+ if (remainingIdle >= 0) {
+ if (remainingIdle == 0) {
+ // someone else already destroyed pc so ignore it
+ conCount--;
+ }
+ else if ( remainingIdle < minRemainingIdle ) {
+ minRemainingIdle = remainingIdle;
+ }
+ }
+ else /* (remainingIdle < 0) */{
+ // this means that we idleExpired the connection
+ expireCount++;
+ conCount--;
+ removeFromEndpointMap(pc);
+ toClose.add(pc);
+ it.remove();
+ }
+ }
+ }
+ if (conCount > minConnections && minRemainingIdle < Long.MAX_VALUE) {
+ try {
+ backgroundProcessor.schedule(new IdleExpireConnectionsTask(),
+ minRemainingIdle,
+ TimeUnit.NANOSECONDS);
+ }
+ catch (RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+ haveIdleExpireConnectionsTask = true;
+ }
+ }
+
+ if (expireCount > 0) {
+ getPoolStats().incIdleExpire(expireCount);
+ getPoolStats().incPoolConnections(-expireCount);
+// logger.info("DEBUG: checkIdleExpiration incPoolConnections(" + (-expireCount) + ")->" + getPoolStats().getPoolConnections());
+ // do this outside the above sync
+ lock.lock();
+ try {
+// getPoolStats().incConCount(-expireCount);
+ connectionCount -= expireCount;
+// logger.info("DEBUG: checkIdleExpiration conCount(" + (-expireCount) + ")->" + connectionCount);
+ freeConnection.signalAll();
+ if(connectionCount < minConnections) {
+ startBackgroundPrefill();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ //now destroy all of the connections, outside the sync
+// if (toClose != null) (cannot be null)
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ {
+ for (Iterator itr = toClose.iterator(); itr.hasNext(); ) {
+ PooledConnection connection = (PooledConnection) itr.next();
+ if (isDebugEnabled) {
+ logger.debug("Idle connection detected. Expiring connection {}", connection);
+ }
+ try {
+ connection.internalClose(false);
+ }
+ catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectionManagerImpl_ERROR_EXPIRING_CONNECTION_0,
+ connection));
+ }
+ }
+ }
+ }
+
+ public void checkLifetimes() {
+ // logger.info("DEBUG: Looking for connections whose lifetime has expired");
+ boolean done;
+ synchronized (this) {
+ this.haveLifetimeExpireConnectionsTask = false;
+ if(shuttingDown) {
+ return;
+ }
+ }
+ do {
+ getPoolStats().incLoadConditioningCheck();
+ long firstLife = -1;
+ done = true;
+ ServerLocation candidate = null;
+ boolean idlePossible = true;
+
+ synchronized (this) {
+ if(shuttingDown) {
+ return;
+ }
+ // find a connection whose lifetime has expired
+ // and who is not already being replaced
+ long now = System.nanoTime();
+ long life = 0;
+ idlePossible = isIdleExpirePossible();
+ for (Iterator it = this.allConnections.iterator();
+ it.hasNext() && life <= 0 && (candidate == null);) {
+ PooledConnection pc = (PooledConnection)it.next();
+ // skip over idle expired and destroyed
+ life = pc.remainingLife(now, lifetimeTimeoutNanos);
+// logger.fine("DEBUG: life remaining in " + pc + " is: " + life);
+ if (life <= 0) {
+ boolean idleTimedOut = idlePossible
+ ? pc.hasIdleExpired(now, idleTimeoutNanos)
+ : false;
+ boolean destroyed = pc.shouldDestroy();
+// logger.fine("DEBUG: idleTimedOut=" + idleTimedOut
+// + " destroyed=" + destroyed);
+ if (!idleTimedOut && !destroyed) {
+ candidate = pc.getServer();
+ }
+ }
+ else if ( firstLife == -1 ) {
+ firstLife = life;
+ }
+ }
+ }
+ if (candidate != null) {
+// logger.fine("DEBUG: calling createLifetimeReplacementConnection");
+ done = !createLifetimeReplacementConnection(candidate, idlePossible);
+// logger.fine("DEBUG: createLifetimeReplacementConnection returned " + !done);
+ }
+ else {
+// logger.fine("DEBUG: reschedule " + firstLife);
+ if (firstLife >= 0) {
+ // reschedule
+// logger.info("DEBUG: rescheduling lifetime expire to be in: "
+// + firstLife + " nanos");
+ startBackgroundLifetimeExpiration(firstLife);
+ }
+ done = true; // just to be clear
+ }
+ } while (!done);
+ // If a lifetimeExpire task is not scheduled at this point then
+ // schedule one that will do a check in our configured lifetimeExpire.
+ // this should not be needed but seems to currently help.
+ startBackgroundLifetimeExpiration(lifetimeTimeoutNanos);
+ }
+ }
+
+ private void logInfo(StringId message, Throwable t) {
+ if(t instanceof GemFireSecurityException) {
+ securityLogWriter.info(LocalizedStrings.TWO_ARG_COLON,
+ new Object[] {message.toLocalizedString(), t}, t);
+ } else {
+ logger.info(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON,
+ new Object[] {message.toLocalizedString(), t}), t);
+ }
+ }
+
+ private void logError(StringId message, Throwable t) {
+ if(t instanceof GemFireSecurityException) {
+ securityLogWriter.error(message, t);
+ }
+ else {
+ logger.error(message, t);
+ }
+ }
+
+ public void activate(Connection conn) {
+ assert conn instanceof PooledConnection;
+ ((PooledConnection)conn).activate();
+ }
+ public void passivate(Connection conn, boolean accessed) {
+ assert conn instanceof PooledConnection;
+ ((PooledConnection)conn).passivate(accessed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/PooledConnection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/PooledConnection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/PooledConnection.java
new file mode 100644
index 0000000..db38b8b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/pooling/PooledConnection.java
@@ -0,0 +1,344 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal.pooling;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.ConnectionImpl;
+import com.gemstone.gemfire.cache.client.internal.ConnectionStats;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.Op;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+
+/**
+ * A connection managed by the connection manager. Keeps track
+ * of the current state of the connection.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+class PooledConnection implements Connection {
+
+ /* connection is volatile because we may asynchronously
+ * destroy the pooled connection while shutting down. */
+ private volatile Connection connection;
+ private volatile Endpoint endpoint;
+ private volatile long birthDate;
+ private long lastAccessed; // read & written while synchronized
+ private boolean active = true; // read and write while synchronized on this
+ private final AtomicBoolean shouldDestroy = new AtomicBoolean();
+ private boolean waitingToSwitch = false;
+// private final ConnectionManagerImpl manager;
+
+ public PooledConnection(ConnectionManagerImpl manager, Connection connection) {
+// this.manager = manager;
+ this.connection = connection;
+ this.endpoint = connection.getEndpoint();
+ this.birthDate = System.nanoTime();
+ this.lastAccessed = this.birthDate;
+ }
+
+ public ServerLocation getServer() {
+ return getEndpoint().getLocation();
+ }
+
+ public boolean isActive() {
+ synchronized (this) {
+ return this.active;
+ }
+ }
+
+ public void internalDestroy() {
+ this.shouldDestroy.set(true); // probably already set but make sure
+ synchronized (this) {
+ this.active = false;
+ notifyAll();
+ Connection myCon = connection;
+ if (myCon != null) {
+ myCon.destroy();
+ connection = null;
+ }
+ }
+ }
+
+ /** When a pooled connection is destroyed, it's not destroyed
+ * right away, but when it is returned to the pool.
+ */
+ public void destroy() {
+ this.shouldDestroy.set(true);
+ }
+
+ public void internalClose(boolean keepAlive) throws Exception {
+ try {
+ Connection con = this.connection;
+ if (con != null) {
+ con.close(keepAlive);
+ }
+ } finally {
+ internalDestroy();
+ }
+ }
+
+ public void close(boolean keepAlive) throws Exception {
+ // needed to junit test
+ internalClose(keepAlive);
+// throw new UnsupportedOperationException(
+// "Pooled connections should only be closed by the connection manager");
+ }
+
+ public void emergencyClose() {
+ Connection con = this.connection;
+ if (con != null) {
+ this.connection.emergencyClose();
+ }
+ this.connection = null;
+
+ }
+
+ Connection getConnection() {
+ Connection result = this.connection;
+ if (result == null) {
+ throw new ConnectionDestroyedException();
+ }
+ return result;
+ }
+
+ /**
+ * Set the destroy bit if it is not already set.
+ * @return true if we were able to set to bit; false if someone else already did
+ */
+ public boolean setShouldDestroy() {
+ return this.shouldDestroy.compareAndSet(false, true);
+ }
+
+ public boolean shouldDestroy() {
+ return this.shouldDestroy.get();
+ }
+
+ public boolean isDestroyed() {
+ return connection == null;
+ }
+
+ public void passivate(final boolean accessed) {
+ long now = 0L;
+ if (accessed) {
+ // do this outside the sync
+ now = System.nanoTime();
+ }
+ synchronized (this) {
+ if(isDestroyed()) {
+ return;
+ }
+ if(!this.active) {
+ throw new InternalGemFireException("Connection not active");
+ }
+ this.active = false;
+ notifyAll();
+ if (accessed) {
+ this.lastAccessed = now; // do this while synchronized
+ }
+ }
+ }
+
+
+ public synchronized boolean switchConnection(Connection newCon)
+ throws InterruptedException {
+ Connection oldCon = null;
+ synchronized (this) {
+ if (shouldDestroy()) return false;
+
+ if (this.active && !shouldDestroy()) {
+ this.waitingToSwitch = true;
+ try {
+ while (this.active && !shouldDestroy()) {
+ wait();
+ }
+ } finally {
+ this.waitingToSwitch = false;
+ notifyAll();
+ }
+ }
+ if (shouldDestroy()) return false;
+ assert !this.active;
+ final long now = System.nanoTime();
+ oldCon = this.connection;
+ this.connection = newCon;
+ this.endpoint = newCon.getEndpoint();
+ this.birthDate = now;
+ }
+ if (oldCon != null) {
+ try {
+ // do this outside of sync
+ oldCon.close(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ return true;
+ }
+
+ public void activate() {
+ synchronized (this) {
+ try {
+ while (this.waitingToSwitch) {
+ wait();
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ getConnection(); // it checks if we are destroyed
+ if(active) {
+ throw new InternalGemFireException("Connection already active");
+ }
+ if(shouldDestroy()) {
+ throw new ConnectionDestroyedException();
+ }
+ active = true;
+ }
+ }
+
+ private synchronized long getLastAccessed() {
+ return lastAccessed;
+ }
+
+ public long getBirthDate() {
+ return this.birthDate;
+ }
+
+ public void setBirthDate(long ts) {
+ this.birthDate = ts;
+ }
+
+ /**
+ * Returns the number of nanos remaining is this guys life.
+ */
+ public long remainingLife(long now, long timeoutNanos) {
+ return (getBirthDate() - now) + timeoutNanos;
+ }
+
+ private long remainingIdle(long now, long timeoutNanos) {
+ return (getLastAccessed() - now) + timeoutNanos;
+ }
+
+ /**
+ * If we were able to idle timeout this connection then return
+ * -1.
+ * If this connection has already been destroyed return 0.
+ * Otherwise return the amount of idle time he has remaining.
+ * If he is active we can't time him out now and a hint is returned
+ * as when we should check him next.
+
+ *
+ */
+ public long doIdleTimeout(long now, long timeoutNanos) {
+ if (shouldDestroy()) return 0;
+ synchronized (this) {
+ if (isActive()) {
+ // this is a reasonable value to return since odds are that
+ // when he goes inactive he will be resetting his access time.
+ return timeoutNanos;
+ } else {
+ long idleRemaining = remainingIdle(now, timeoutNanos);
+ if (idleRemaining <= 0) {
+ if (setShouldDestroy()) {
+ // we were able to set the destroy bit
+ return -1;
+ } else {
+ // someone else already destroyed it
+ return 0;
+ }
+ } else {
+ return idleRemaining;
+ }
+ }
+ }
+ }
+ /**
+ * Return true if the connection has been idle long enough to expire.
+ */
+ public boolean hasIdleExpired(long now, long timeoutNanos) {
+ synchronized (this) {
+ if (isActive()) {
+ return false;
+ } else {
+ return remainingIdle(now, timeoutNanos) <= 0;
+ }
+ }
+ }
+
+ public ByteBuffer getCommBuffer() {
+ return getConnection().getCommBuffer();
+ }
+
+ public Socket getSocket() {
+ return getConnection().getSocket();
+ }
+
+ public OutputStream getOutputStream() {
+ return getConnection().getOutputStream();
+ }
+
+ public InputStream getInputStream() {
+ return getConnection().getInputStream();
+ }
+
+ public ConnectionStats getStats() {
+ return getEndpoint().getStats();
+ }
+
+ public Endpoint getEndpoint() {
+ return this.endpoint;
+ }
+ public ServerQueueStatus getQueueStatus() {
+ return getConnection().getQueueStatus();
+ }
+
+ @Override
+ public String toString() {
+ Connection myCon = connection;
+ if (myCon != null) {
+ return "Pooled Connection to " + this.endpoint + ": " + myCon.toString();
+ } else {
+ return "Pooled Connection to " + this.endpoint + ": Connection[DESTROYED]";
+ }
+ }
+
+ public Object execute(Op op) throws Exception {
+ return getConnection().execute(op);
+ }
+
+ public static void loadEmergencyClasses() {
+ ConnectionImpl.loadEmergencyClasses();
+ }
+ public short getWanSiteVersion(){
+ return getConnection().getWanSiteVersion();
+ }
+
+ public int getDistributedSystemId() {
+ return getConnection().getDistributedSystemId();
+ }
+
+ public void setWanSiteVersion(short wanSiteVersion){
+ getConnection().setWanSiteVersion(wanSiteVersion);
+ }
+
+ public void setConnectionID(long id) {
+ this.connection.setConnectionID(id);
+ }
+
+ public long getConnectionID() {
+ return this.connection.getConnectionID();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/package.html
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/package.html b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/package.html
new file mode 100644
index 0000000..e2f7bce
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/package.html
@@ -0,0 +1,52 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+
+<HTML>
+ <HEAD>
+ <TITLE>com.gemstone.gemfire.cache.client package</TITLE>
+ </HEAD>
+ <BODY>
+ The <code>com.gemstone.gemfire.cache.client</code> package provides APIs used
+ for client connectivity and caching.
+<p>
+Most clients will only need to create a
+{@link com.gemstone.gemfire.cache.client.ClientCache}
+using a
+{@link com.gemstone.gemfire.cache.client.ClientCacheFactory}.
+<p>
+A client configures the servers it will connect to by creating one or more
+{@link com.gemstone.gemfire.cache.client.Pool pools}.
+For most use cases one pool per client is all you need and the easiest
+way to get a single pool is to use
+{@link com.gemstone.gemfire.cache.client.ClientCacheFactory}.
+If you do need more than one pool use a
+{@link com.gemstone.gemfire.cache.client.PoolFactory pool factory} obtained from the
+{@link com.gemstone.gemfire.cache.client.PoolManager pool manager} before you
+create the cache using
+{@link com.gemstone.gemfire.cache.client.ClientCacheFactory}.
+<p>
+An alternative to these APIs is to use the <code>pool</code> XML element
+as described in the <code>cache6_5.dtd</code>.
+<p>
+If you create more than one pool then for regions that will use a pool you
+need to configure the pool name on the regions.
+This can be done by setting the
+pool name on the region using the
+{@link com.gemstone.gemfire.cache.client.ClientRegionFactory#setPoolName API}
+or using the <code>pool-name</code> attribute on the <code>region-attributes</code>
+as described in the <code>cache6_5.dtd</code>.
+
+<a name="declarative"><h2>Client Declarative Caching</h2>
+
+<p>A "caching XML file" declares regions, entries, and attributes. When
+a <code>ClientCache</code> is created its contents can be initialized
+according to a caching XML file.
+The top level element must be a client-cache element.
+</p>
+
+<p>The Document Type Definition for a declarative cache XML file can
+be found in <code>"doc-files/cache6_5.dtd"</code>.
+For examples of declarative cache XML files see <A
+href="doc-files/example-client-cache.xml">example1</A>.
+
+ </BODY>
+</HTML>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceFactory.java
new file mode 100644
index 0000000..0db0b3a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceFactory.java
@@ -0,0 +1,57 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.control;
+
+import java.util.Set;
+
+/**
+ * Factory for defining and starting a {@link RebalanceOperation}.
+ *
+ * @since 6.0
+ */
+public interface RebalanceFactory {
+
+ /**
+ * Specify which regions to include in the rebalance operation. The default,
+ * <code>null<code>, means
+ * all regions should be rebalanced. Includes take precedence over excludes.
+ *
+ * @param regions
+ * A set containing the names of regions to include.
+ * @since 6.5
+ */
+ RebalanceFactory includeRegions(Set<String> regions);
+
+ /**
+ * Exclude specific regions from the rebalancing operation. The default,
+ * <code>null<code>, means
+ * don't exclude any regions.
+ *
+ * @param regions
+ * A set containing the names of regions to exclude.
+ * @since 6.5
+ */
+ RebalanceFactory excludeRegions(Set<String> regions);
+
+ /**
+ * Asynchronously starts a new rebalance operation. Only the GemFire
+ * controlled cache resources used by this member will be rebalanced.
+ * Operation may queue as needed for resources in contention by other
+ * active rebalance operations.
+ */
+ public RebalanceOperation start();
+
+ /**
+ * Simulates a rebalance of the GemFire controlled cache resources on this
+ * member. This operation will not make any actual changes. It will only
+ * produce a report of what the results would have been had this been a real
+ * rebalance operation.
+ */
+ public RebalanceOperation simulate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceOperation.java
new file mode 100644
index 0000000..a7fc69a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceOperation.java
@@ -0,0 +1,66 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.control;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Operation for rebalancing resources used by the {@link
+ * com.gemstone.gemfire.cache GemFire Cache}.
+ *
+ * @since 6.0
+ */
+public interface RebalanceOperation {
+
+ // NOTE: cancelled is the spelling used in java.util.concurrent.Future
+
+ /**
+ * Returns true if this operation was cancelled before it completed.
+ */
+ public boolean isCancelled();
+
+ /**
+ * Returns true if this operation completed.
+ */
+ public boolean isDone();
+
+ /**
+ * Cancels this rebalance operation. The rebalance operation will find a
+ * safe point and then stop.
+ *
+ * @return false if this operation could not be cancelled, typically because
+ * it has already completed; true otherwise
+ */
+ public boolean cancel();
+
+ /**
+ * Wait for this operation to complete and return the results.
+ *
+ * @return the rebalance results
+ * @throws CancellationException if the operation was cancelled
+ * @throws InterruptedException if the wait was interrupted
+ */
+ public RebalanceResults getResults()
+ throws CancellationException, InterruptedException;
+
+ /**
+ * Wait for this operation to complete and return the results.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return the rebalance results
+ * @throws CancellationException if the operation was cancelled
+ * @throws TimeoutException if the wait timed out
+ * @throws InterruptedException if the wait was interrupted
+ */
+ public RebalanceResults getResults(long timeout, TimeUnit unit)
+ throws CancellationException, TimeoutException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceResults.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceResults.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceResults.java
new file mode 100644
index 0000000..987618a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/RebalanceResults.java
@@ -0,0 +1,97 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.control;
+
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.partition.PartitionRebalanceInfo;
+
+/**
+ * The results of rebalancing <code>Cache</code> resources.
+ *
+ * @since 6.0
+ */
+public interface RebalanceResults {
+
+ /**
+ * Returns a <code>Set</code> of detailed information about each partitioned
+ * region that was rebalanced.
+ *
+ * @return a <code>Set</code> of detailed information about each partitioned
+ * region that was rebalanced
+ */
+ public Set<PartitionRebalanceInfo> getPartitionRebalanceDetails();
+
+ /**
+ * Returns the total time, in milliseconds, that the rebalance operation took.
+ *
+ * @return the total time, in milliseconds, that the rebalance operation took
+ */
+ public long getTotalTime();
+
+ /**
+ * Returns the total number of buckets created during the rebalance operation.
+ *
+ * @return the total number of buckets created during the rebalance operation
+ */
+ public int getTotalBucketCreatesCompleted();
+
+ /**
+ * Returns the total size, in bytes, of all of the buckets that were created
+ * as part of the rebalance operation.
+ *
+ * @return the total size, in bytes, of all of the buckets that were created
+ * as part of the rebalance operation
+ */
+ public long getTotalBucketCreateBytes();
+
+ /**
+ * Returns the total time, in milliseconds, taken to create buckets.
+ *
+ * @return the total time, in milliseconds, taken to create buckets
+ */
+ public long getTotalBucketCreateTime();
+
+ /**
+ * Returns the total number of buckets transferred.
+ *
+ * @return the total number of buckets transferred
+ */
+ public int getTotalBucketTransfersCompleted();
+
+ /**
+ * Returns the total size, in bytes, of buckets that were transferred.
+ *
+ * @return the total size, in bytes, of buckets that were transferred
+ */
+ public long getTotalBucketTransferBytes();
+
+ /**
+ * Returns the total amount of time, in milliseconds, it took to transfer
+ * buckets.
+ *
+ * @return the total amount of time, in milliseconds, it took to transfer
+ * buckets
+ */
+ public long getTotalBucketTransferTime();
+
+ /**
+ * Returns the total number of primaries that were transferred.
+ *
+ * @return the total number of primaries that were transferred
+ */
+ public int getTotalPrimaryTransfersCompleted();
+
+ /**
+ * Returns the total time, in milliseconds, spent transferring primaries.
+ *
+ * @return the total time, in milliseconds, spent transferring primaries
+ */
+ public long getTotalPrimaryTransferTime();
+}