You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/04/04 17:49:26 UTC
[geode] branch develop updated: GEODE-6573: move methods from
ConnectionManager to Connection (#3375)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a4ad661 GEODE-6573: move methods from ConnectionManager to Connection (#3375)
a4ad661 is described below
commit a4ad6617ecf0cc6139d4df483c5100fa50b0d53f
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Apr 4 10:49:14 2019 -0700
GEODE-6573: move methods from ConnectionManager to Connection (#3375)
Connection now has default implementations of getWrappedConnection,
activate, and passivate. ConnectionManager no longer has these methods.
---
.../pooling/ConnectionManagerImplTest.java | 75 ----------------------
.../pooling/ConnectionManagerJUnitTest.java | 6 +-
.../geode/cache/client/internal/Connection.java | 31 +++++++++
.../cache/client/internal/OpExecutorImpl.java | 12 ++--
.../cache/client/internal/QueueConnectionImpl.java | 5 ++
.../client/internal/pooling/ConnectionManager.java | 17 -----
.../internal/pooling/ConnectionManagerImpl.java | 27 +-------
.../client/internal/pooling/PooledConnection.java | 10 ++-
.../client/internal/OpExecutorImplJUnitTest.java | 20 ------
9 files changed, 53 insertions(+), 150 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
index 35afea3..943854f 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
@@ -481,79 +481,4 @@ public class ConnectionManagerImplTest {
connectionManager.close(false);
}
-
- @Test
- public void activateActivatesConnection() {
- Connection connection = mock(Connection.class);
- ServerLocation serverLocation = mock(ServerLocation.class);
- when(connectionFactory.createClientToServerConnection(serverLocation, false))
- .thenReturn(connection);
-
- connectionManager = createDefaultConnectionManager();
- connectionManager.start(backgroundProcessor);
-
- Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
- connectionManager.passivate(heldConnection, false);
- connectionManager.activate(heldConnection);
-
- assertThat(((PooledConnection) heldConnection).isActive()).isTrue();
-
- connectionManager.close(false);
- }
-
- @Test
- public void activateThrowsIfConnectionIsAlreadyActive() {
- Connection connection = mock(Connection.class);
- ServerLocation serverLocation = mock(ServerLocation.class);
- when(connectionFactory.createClientToServerConnection(serverLocation, false))
- .thenReturn(connection);
-
- connectionManager = createDefaultConnectionManager();
- connectionManager.start(backgroundProcessor);
-
- Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
-
- assertThatThrownBy(() -> connectionManager.activate(heldConnection))
- .isInstanceOf(InternalGemFireException.class)
- .hasMessageContaining("Connection already active");
-
- connectionManager.close(false);
- }
-
- @Test
- public void passivatePassivatesConnection() {
- Connection connection = mock(Connection.class);
- ServerLocation serverLocation = mock(ServerLocation.class);
- when(connectionFactory.createClientToServerConnection(serverLocation, false))
- .thenReturn(connection);
-
- connectionManager = createDefaultConnectionManager();
- connectionManager.start(backgroundProcessor);
-
- Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
- connectionManager.passivate(heldConnection, false);
-
- assertThat(((PooledConnection) heldConnection).isActive()).isFalse();
-
- connectionManager.close(false);
- }
-
- @Test
- public void passivateThrowsWhenConnectionIsNotActive() {
- Connection connection = mock(Connection.class);
- ServerLocation serverLocation = mock(ServerLocation.class);
- when(connectionFactory.createClientToServerConnection(serverLocation, false))
- .thenReturn(connection);
-
- connectionManager = createDefaultConnectionManager();
- connectionManager.start(backgroundProcessor);
-
- Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
- connectionManager.passivate(heldConnection, false);
- assertThatThrownBy(() -> connectionManager.passivate(heldConnection, false))
- .isInstanceOf(InternalGemFireException.class)
- .hasMessageContaining("Connection not active");
-
- connectionManager.close(false);
- }
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
index 2ae2ce4..a0e7eaf 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
@@ -270,7 +270,7 @@ public class ConnectionManagerJUnitTest {
{
// make sure a thread local connection that has been passivated can idle-expire
- manager.passivate(conn1, true);
+ conn1.passivate(true);
long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
Assert.assertEquals(5, factory.creates);
@@ -761,7 +761,7 @@ public class ConnectionManagerJUnitTest {
conn = borrow(i);
} else {
if (i != 0) {
- manager.activate(conn);
+ conn.activate();
}
}
try {
@@ -775,7 +775,7 @@ public class ConnectionManagerJUnitTest {
if (!threadLocal) {
manager.returnConnection(conn);
} else {
- manager.passivate(conn, true);
+ conn.passivate(true);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
index 46794df..48c203d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
@@ -20,6 +20,7 @@ import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import org.apache.geode.InternalGemFireException;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
@@ -82,4 +83,34 @@ public interface Connection {
void setConnectionID(long id);
long getConnectionID();
+
+ /**
+ * If this connection wraps another connection then
+ * return the wrapped connection.
+ * If this connection does not wrap then return this.
+ *
+ * @return the wrapped connection or this connection
+ * @throws ConnectionDestroyedException if the wrapped connection no longer exists
+ */
+ default Connection getWrappedConnection() {
+ return this;
+ }
+
+ /**
+ * Mark the connection as being actively used.
+ *
+ * @return true if connection activated, false if could not be activated because it is destroyed
+ * @throws InternalGemFireException when the connection is already active
+ */
+ default boolean activate() {
+ return true;
+ }
+
+ /**
+ * Mark the connection as one that is not being used.
+ *
+ * @param accessed true if the connection was used while active
+ * @throws InternalGemFireException when the connection is already passive
+ */
+ default void passivate(boolean accessed) {}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index cf128fd..3087cdd 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -153,7 +153,7 @@ public class OpExecutorImpl implements ExecutablePool {
// while we're performing the op. It will be reset
// if the op succeeds.
localConnection.set(null);
- if (!this.connectionManager.activate(conn)) {
+ if (!conn.activate()) {
conn = connectionManager.borrowConnection(serverTimeout);
}
}
@@ -204,7 +204,7 @@ public class OpExecutorImpl implements ExecutablePool {
}
} finally {
if (threadLocalConnections) {
- this.connectionManager.passivate(conn, success);
+ conn.passivate(success);
// Fix for 43718. If the thread local was set to a different
// connection deeper in the call stack, return that connection
// and set our connection on the thread local.
@@ -413,7 +413,7 @@ public class OpExecutorImpl implements ExecutablePool {
this.affinityServerLocation.set(conn.getServer());
}
if (useThreadLocalConnection(op, pingOp)) {
- this.connectionManager.passivate(conn, success);
+ conn.passivate(success);
setThreadLocalConnectionForSingleHop(server, conn);
}
if (returnCnx) {
@@ -443,7 +443,7 @@ public class OpExecutorImpl implements ExecutablePool {
}
boolean borrow = true;
if (conn != null) {
- if (this.connectionManager.activate(conn)) {
+ if (conn.activate()) {
borrow = false;
if (!conn.getServer().equals(server)) {
// poolLoadConditioningMonitor can replace the connection's
@@ -866,7 +866,7 @@ public class OpExecutorImpl implements ExecutablePool {
// This should not be reached, but keeping this code here in case it is
// reached.
if (conn.getServer().getUserId() == -1) {
- Connection connImpl = this.connectionManager.getConnection(conn);
+ Connection connImpl = conn.getWrappedConnection();
conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this.pool));
if (logger.isDebugEnabled()) {
logger.debug(
@@ -918,7 +918,7 @@ public class OpExecutorImpl implements ExecutablePool {
PoolImpl pool =
(PoolImpl) PoolManagerImpl.getPMI().find(this.endpointManager.getPoolName());
if (!pool.getMultiuserAuthentication()) {
- Connection connImpl = this.connectionManager.getConnection(conn);
+ Connection connImpl = conn.getWrappedConnection();
conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this));
return conn.execute(op);
} else {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
index 73b842e..c7c840f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
@@ -184,6 +184,11 @@ public class QueueConnectionImpl implements Connection {
return result;
}
+ @Override
+ public Connection getWrappedConnection() {
+ return getConnection();
+ }
+
FailureTracker getFailureTracker() {
return failureTracker;
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
index ea221f0..41c3d85 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
@@ -120,21 +120,4 @@ public interface ConnectionManager {
int getConnectionCount();
void emergencyClose();
-
- /**
- * Used to active a thread local connection
- *
- * @return true if connection activated, false if could not be activated because it is destroyed
- * @throws InternalGemFireException when the connection is already active
- */
- boolean activate(Connection conn);
-
- /**
- * Used to passivate a thread local connection
- *
- * @throws InternalGemFireException when the connection is already passive
- */
- void passivate(Connection conn, boolean accessed);
-
- Connection getConnection(Connection conn);
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index fff52e3..d6d2e95 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -50,7 +50,6 @@ import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
-import org.apache.geode.cache.client.internal.QueueConnectionImpl;
import org.apache.geode.distributed.PoolCancelledException;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.i18n.StringId;
@@ -310,7 +309,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
* destroyed when returned to the pool.
*/
@Override
- public Connection borrowConnection(ServerLocation server, long acquireTimeout,
+ public PooledConnection borrowConnection(ServerLocation server, long acquireTimeout,
boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException {
PooledConnection connection =
availableConnectionManager.useFirst((c) -> c.getServer().equals(server));
@@ -580,18 +579,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
return this.poolStats;
}
- @Override
- public Connection getConnection(Connection conn) {
- if (conn instanceof PooledConnection) {
- return ((PooledConnection) conn).getConnection();
- } else if (conn instanceof QueueConnectionImpl) {
- return ((QueueConnectionImpl) conn).getConnection();
- } else {
- return conn;
- }
- }
-
-
private boolean prefillConnection() {
if (shuttingDown.get()) {
return false;
@@ -1223,18 +1210,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
}
}
- @Override
- public boolean activate(Connection conn) {
- assert conn instanceof PooledConnection;
- return ((PooledConnection) conn).activate();
- }
-
- @Override
- public void passivate(Connection conn, boolean accessed) {
- assert conn instanceof PooledConnection;
- ((PooledConnection) conn).passivate(accessed);
- }
-
private static class ClosedPoolConnectionList extends ArrayList {
@Override
public Object set(int index, Object element) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index f0d3491..0a03048 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -131,6 +131,11 @@ public class PooledConnection implements Connection {
return result;
}
+ @Override
+ public Connection getWrappedConnection() {
+ return getConnection();
+ }
+
/**
* Set the destroy bit if it is not already set.
*
@@ -149,6 +154,7 @@ public class PooledConnection implements Connection {
return connection == null;
}
+ @Override
public void passivate(final boolean accessed) {
long now = 0L;
if (accessed) {
@@ -208,9 +214,7 @@ public class PooledConnection implements Connection {
return true;
}
- /**
- * @return true if connection activated, false if could not be activated because it is destroyed
- */
+ @Override
public boolean activate() {
synchronized (this) {
try {
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 4b5b74d..e10c470 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -556,13 +556,6 @@ public class OpExecutorImplJUnitTest {
return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers));
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.cache.client.internal.pooling.ConnectionManager#borrowConnection(org.apache.
- * geode.distributed.internal.ServerLocation, long)
- */
@Override
public Connection borrowConnection(ServerLocation server, long aquireTimeout,
boolean onlyUseExistingCnx) {
@@ -602,19 +595,6 @@ public class OpExecutorImplJUnitTest {
public int getConnectionCount() {
return 0;
}
-
- @Override
- public Connection getConnection(Connection conn) {
- return conn;
- }
-
- @Override
- public boolean activate(Connection conn) {
- return true;
- }
-
- @Override
- public void passivate(Connection conn, boolean accessed) {}
}
private class DummyConnection implements Connection {