You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/04/04 17:49:26 UTC

[geode] branch develop updated: GEODE-6573: move methods from ConnectionManager to Connection (#3375)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a4ad661  GEODE-6573: move methods from ConnectionManager to Connection (#3375)
a4ad661 is described below

commit a4ad6617ecf0cc6139d4df483c5100fa50b0d53f
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Apr 4 10:49:14 2019 -0700

    GEODE-6573: move methods from ConnectionManager to Connection (#3375)
    
    Connection now has default implementations of getWrappedConnection,
    activate, and passivate. ConnectionManager no longer has these methods.
---
 .../pooling/ConnectionManagerImplTest.java         | 75 ----------------------
 .../pooling/ConnectionManagerJUnitTest.java        |  6 +-
 .../geode/cache/client/internal/Connection.java    | 31 +++++++++
 .../cache/client/internal/OpExecutorImpl.java      | 12 ++--
 .../cache/client/internal/QueueConnectionImpl.java |  5 ++
 .../client/internal/pooling/ConnectionManager.java | 17 -----
 .../internal/pooling/ConnectionManagerImpl.java    | 27 +-------
 .../client/internal/pooling/PooledConnection.java  | 10 ++-
 .../client/internal/OpExecutorImplJUnitTest.java   | 20 ------
 9 files changed, 53 insertions(+), 150 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
index 35afea3..943854f 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
@@ -481,79 +481,4 @@ public class ConnectionManagerImplTest {
 
     connectionManager.close(false);
   }
-
-  @Test
-  public void activateActivatesConnection() {
-    Connection connection = mock(Connection.class);
-    ServerLocation serverLocation = mock(ServerLocation.class);
-    when(connectionFactory.createClientToServerConnection(serverLocation, false))
-        .thenReturn(connection);
-
-    connectionManager = createDefaultConnectionManager();
-    connectionManager.start(backgroundProcessor);
-
-    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
-    connectionManager.passivate(heldConnection, false);
-    connectionManager.activate(heldConnection);
-
-    assertThat(((PooledConnection) heldConnection).isActive()).isTrue();
-
-    connectionManager.close(false);
-  }
-
-  @Test
-  public void activateThrowsIfConnectionIsAlreadyActive() {
-    Connection connection = mock(Connection.class);
-    ServerLocation serverLocation = mock(ServerLocation.class);
-    when(connectionFactory.createClientToServerConnection(serverLocation, false))
-        .thenReturn(connection);
-
-    connectionManager = createDefaultConnectionManager();
-    connectionManager.start(backgroundProcessor);
-
-    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
-
-    assertThatThrownBy(() -> connectionManager.activate(heldConnection))
-        .isInstanceOf(InternalGemFireException.class)
-        .hasMessageContaining("Connection already active");
-
-    connectionManager.close(false);
-  }
-
-  @Test
-  public void passivatePassivatesConnection() {
-    Connection connection = mock(Connection.class);
-    ServerLocation serverLocation = mock(ServerLocation.class);
-    when(connectionFactory.createClientToServerConnection(serverLocation, false))
-        .thenReturn(connection);
-
-    connectionManager = createDefaultConnectionManager();
-    connectionManager.start(backgroundProcessor);
-
-    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
-    connectionManager.passivate(heldConnection, false);
-
-    assertThat(((PooledConnection) heldConnection).isActive()).isFalse();
-
-    connectionManager.close(false);
-  }
-
-  @Test
-  public void passivateThrowsWhenConnectionIsNotActive() {
-    Connection connection = mock(Connection.class);
-    ServerLocation serverLocation = mock(ServerLocation.class);
-    when(connectionFactory.createClientToServerConnection(serverLocation, false))
-        .thenReturn(connection);
-
-    connectionManager = createDefaultConnectionManager();
-    connectionManager.start(backgroundProcessor);
-
-    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
-    connectionManager.passivate(heldConnection, false);
-    assertThatThrownBy(() -> connectionManager.passivate(heldConnection, false))
-        .isInstanceOf(InternalGemFireException.class)
-        .hasMessageContaining("Connection not active");
-
-    connectionManager.close(false);
-  }
 }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
index 2ae2ce4..a0e7eaf 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
@@ -270,7 +270,7 @@ public class ConnectionManagerJUnitTest {
 
     {
       // make sure a thread local connection that has been passivated can idle-expire
-      manager.passivate(conn1, true);
+      conn1.passivate(true);
 
       long elapsedMillis = factory.waitWhile(() -> factory.destroys < 1);
       Assert.assertEquals(5, factory.creates);
@@ -761,7 +761,7 @@ public class ConnectionManagerJUnitTest {
             conn = borrow(i);
           } else {
             if (i != 0) {
-              manager.activate(conn);
+              conn.activate();
             }
           }
           try {
@@ -775,7 +775,7 @@ public class ConnectionManagerJUnitTest {
             if (!threadLocal) {
               manager.returnConnection(conn);
             } else {
-              manager.passivate(conn, true);
+              conn.passivate(true);
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
index 46794df..48c203d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/Connection.java
@@ -20,6 +20,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 
+import org.apache.geode.InternalGemFireException;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 
@@ -82,4 +83,34 @@ public interface Connection {
   void setConnectionID(long id);
 
   long getConnectionID();
+
+  /**
+   * If this connection wraps another connection then
+   * return the wrapped connection.
+   * If this connection does not wrap then return this.
+   *
+   * @return the wrapped connection or this connection
+   * @throws ConnectionDestroyedException if the wrapped connection no longer exists
+   */
+  default Connection getWrappedConnection() {
+    return this;
+  }
+
+  /**
+   * Mark the connection as being actively used.
+   *
+   * @return true if connection activated, false if could not be activated because it is destroyed
+   * @throws InternalGemFireException when the connection is already active
+   */
+  default boolean activate() {
+    return true;
+  }
+
+  /**
+   * Mark the connection as one that is not being used.
+   *
+   * @param accessed true if the connection was used while active
+   * @throws InternalGemFireException when the connection is already passive
+   */
+  default void passivate(boolean accessed) {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index cf128fd..3087cdd 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -153,7 +153,7 @@ public class OpExecutorImpl implements ExecutablePool {
       // while we're performing the op. It will be reset
       // if the op succeeds.
       localConnection.set(null);
-      if (!this.connectionManager.activate(conn)) {
+      if (!conn.activate()) {
         conn = connectionManager.borrowConnection(serverTimeout);
       }
     }
@@ -204,7 +204,7 @@ public class OpExecutorImpl implements ExecutablePool {
       }
     } finally {
       if (threadLocalConnections) {
-        this.connectionManager.passivate(conn, success);
+        conn.passivate(success);
         // Fix for 43718. If the thread local was set to a different
         // connection deeper in the call stack, return that connection
         // and set our connection on the thread local.
@@ -413,7 +413,7 @@ public class OpExecutorImpl implements ExecutablePool {
         this.affinityServerLocation.set(conn.getServer());
       }
       if (useThreadLocalConnection(op, pingOp)) {
-        this.connectionManager.passivate(conn, success);
+        conn.passivate(success);
         setThreadLocalConnectionForSingleHop(server, conn);
       }
       if (returnCnx) {
@@ -443,7 +443,7 @@ public class OpExecutorImpl implements ExecutablePool {
     }
     boolean borrow = true;
     if (conn != null) {
-      if (this.connectionManager.activate(conn)) {
+      if (conn.activate()) {
         borrow = false;
         if (!conn.getServer().equals(server)) {
           // poolLoadConditioningMonitor can replace the connection's
@@ -866,7 +866,7 @@ public class OpExecutorImpl implements ExecutablePool {
       // This should not be reached, but keeping this code here in case it is
       // reached.
       if (conn.getServer().getUserId() == -1) {
-        Connection connImpl = this.connectionManager.getConnection(conn);
+        Connection connImpl = conn.getWrappedConnection();
         conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this.pool));
         if (logger.isDebugEnabled()) {
           logger.debug(
@@ -918,7 +918,7 @@ public class OpExecutorImpl implements ExecutablePool {
         PoolImpl pool =
             (PoolImpl) PoolManagerImpl.getPMI().find(this.endpointManager.getPoolName());
         if (!pool.getMultiuserAuthentication()) {
-          Connection connImpl = this.connectionManager.getConnection(conn);
+          Connection connImpl = conn.getWrappedConnection();
           conn.getServer().setUserId((Long) AuthenticateUserOp.executeOn(connImpl, this));
           return conn.execute(op);
         } else {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
index 73b842e..c7c840f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueConnectionImpl.java
@@ -184,6 +184,11 @@ public class QueueConnectionImpl implements Connection {
     return result;
   }
 
+  @Override
+  public Connection getWrappedConnection() {
+    return getConnection();
+  }
+
   FailureTracker getFailureTracker() {
     return failureTracker;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
index ea221f0..41c3d85 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
@@ -120,21 +120,4 @@ public interface ConnectionManager {
   int getConnectionCount();
 
   void emergencyClose();
-
-  /**
-   * Used to active a thread local connection
-   *
-   * @return true if connection activated, false if could not be activated because it is destroyed
-   * @throws InternalGemFireException when the connection is already active
-   */
-  boolean activate(Connection conn);
-
-  /**
-   * Used to passivate a thread local connection
-   *
-   * @throws InternalGemFireException when the connection is already passive
-   */
-  void passivate(Connection conn, boolean accessed);
-
-  Connection getConnection(Connection conn);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index fff52e3..d6d2e95 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -50,7 +50,6 @@ import org.apache.geode.cache.client.internal.Endpoint;
 import org.apache.geode.cache.client.internal.EndpointManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
-import org.apache.geode.cache.client.internal.QueueConnectionImpl;
 import org.apache.geode.distributed.PoolCancelledException;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.i18n.StringId;
@@ -310,7 +309,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
    * destroyed when returned to the pool.
    */
   @Override
-  public Connection borrowConnection(ServerLocation server, long acquireTimeout,
+  public PooledConnection borrowConnection(ServerLocation server, long acquireTimeout,
       boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException {
     PooledConnection connection =
         availableConnectionManager.useFirst((c) -> c.getServer().equals(server));
@@ -580,18 +579,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
     return this.poolStats;
   }
 
-  @Override
-  public Connection getConnection(Connection conn) {
-    if (conn instanceof PooledConnection) {
-      return ((PooledConnection) conn).getConnection();
-    } else if (conn instanceof QueueConnectionImpl) {
-      return ((QueueConnectionImpl) conn).getConnection();
-    } else {
-      return conn;
-    }
-  }
-
-
   private boolean prefillConnection() {
     if (shuttingDown.get()) {
       return false;
@@ -1223,18 +1210,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
     }
   }
 
-  @Override
-  public boolean activate(Connection conn) {
-    assert conn instanceof PooledConnection;
-    return ((PooledConnection) conn).activate();
-  }
-
-  @Override
-  public void passivate(Connection conn, boolean accessed) {
-    assert conn instanceof PooledConnection;
-    ((PooledConnection) conn).passivate(accessed);
-  }
-
   private static class ClosedPoolConnectionList extends ArrayList {
     @Override
     public Object set(int index, Object element) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index f0d3491..0a03048 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -131,6 +131,11 @@ public class PooledConnection implements Connection {
     return result;
   }
 
+  @Override
+  public Connection getWrappedConnection() {
+    return getConnection();
+  }
+
   /**
    * Set the destroy bit if it is not already set.
    *
@@ -149,6 +154,7 @@ public class PooledConnection implements Connection {
     return connection == null;
   }
 
+  @Override
   public void passivate(final boolean accessed) {
     long now = 0L;
     if (accessed) {
@@ -208,9 +214,7 @@ public class PooledConnection implements Connection {
     return true;
   }
 
-  /**
-   * @return true if connection activated, false if could not be activated because it is destroyed
-   */
+  @Override
   public boolean activate() {
     synchronized (this) {
       try {
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 4b5b74d..e10c470 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -556,13 +556,6 @@ public class OpExecutorImplJUnitTest {
       return new DummyConnection(new ServerLocation("localhost", currentServer++ % numServers));
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see
-     * org.apache.geode.cache.client.internal.pooling.ConnectionManager#borrowConnection(org.apache.
-     * geode.distributed.internal.ServerLocation, long)
-     */
     @Override
     public Connection borrowConnection(ServerLocation server, long aquireTimeout,
         boolean onlyUseExistingCnx) {
@@ -602,19 +595,6 @@ public class OpExecutorImplJUnitTest {
     public int getConnectionCount() {
       return 0;
     }
-
-    @Override
-    public Connection getConnection(Connection conn) {
-      return conn;
-    }
-
-    @Override
-    public boolean activate(Connection conn) {
-      return true;
-    }
-
-    @Override
-    public void passivate(Connection conn, boolean accessed) {}
   }
 
   private class DummyConnection implements Connection {