You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/11/20 14:54:01 UTC

[phoenix] branch 4.x updated: PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 6600c07  PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed
6600c07 is described below

commit 6600c072b49c2acc8ced3a411caca2f295b84eda
Author: Saksham Gangwar <sa...@Sakshams-iMac.local>
AuthorDate: Fri Aug 14 16:33:58 2020 -0700

    PHOENIX-6078 Remove Internal Phoenix Connections from parent LinkedQueue when closed
---
 .../phoenix/query/MaxConcurrentConnectionsIT.java  | 41 +++++++++++++++++++---
 .../compile/MutatingParallelIteratorFactory.java   |  4 +++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java | 24 +++++++++++++
 3 files changed, 64 insertions(+), 5 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
index 7da276c..611ef89 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/query/MaxConcurrentConnectionsIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.util.DelayedRegionServer;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -31,22 +32,17 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
-import static org.apache.phoenix.exception.SQLExceptionCode.NEW_CONNECTION_THROTTLED;
 import static org.apache.phoenix.exception.SQLExceptionCode.NEW_INTERNAL_CONNECTION_THROTTLED;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
 import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
-import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -129,4 +125,39 @@ public class MaxConcurrentConnectionsIT extends BaseUniqueNamesOwnClusterIT {
         }
 
     }
+
+    @Test public void testClosedChildConnectionsRemovedFromParentQueue() throws SQLException {
+        String tableName = generateUniqueName();
+        String connectionUrl = getUniqueUrl();
+        int NUMBER_OF_ROWS = 10;
+        String ddl = "CREATE TABLE " + tableName + " (V BIGINT PRIMARY KEY, K BIGINT)";
+        Properties props = new Properties();
+        props.setProperty(CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10));
+        props.setProperty(INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(10));
+        try (Connection conn = DriverManager.getConnection(connectionUrl, props);
+                Statement statement = conn.createStatement()) {
+            statement.execute(ddl);
+        }
+        PhoenixConnection
+                connection =
+                (PhoenixConnection) DriverManager.getConnection(connectionUrl, props);
+        for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+            connection.createStatement()
+                    .execute("UPSERT INTO " + tableName + " VALUES (" + i + ", " + i + ")");
+            connection.commit();
+        }
+        connection.setAutoCommit(false);
+        try {
+            for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+                connection.createStatement()
+                        .execute("DELETE FROM " + tableName + " WHERE K = " + i);
+            }
+        } catch (SQLException e) {
+            fail();
+        } finally {
+            connection.close();
+        }
+        // All 10 child connections should be removed successfully from the queue
+        assertEquals(0, connection.getChildConnectionsCount());
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 755f127..213ff0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -112,6 +112,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
                         MutatingParallelIteratorFactory.this.connection.getMutationState()
                                 .join(finalState);
                     } finally {
+                        //Removing to be closed connection from the parent connection queue.
+                        connection.removeChildConnection(clonedConnection);
                         clonedConnection.close();
                     }
                 }
@@ -124,6 +126,8 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
         } catch (Throwable ex) {
             // Catch just to make sure we close the cloned connection and then rethrow
             try {
+                //Removing to be closed connection from the parent connection queue.
+                connection.removeChildConnection(clonedConnection);
                 // closeQuietly only handles IOException
                 clonedConnection.close();
             } catch (SQLException sqlEx) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 81992ef..d7adcf4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -475,6 +475,30 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         childConnections.add(connection);
     }
 
+    /**
+     * Method to remove child connection from childConnections Queue
+     *
+     * @param connection
+     */
+    public void removeChildConnection(PhoenixConnection connection) {
+        if (childConnections != null) {
+            childConnections.remove(connection);
+        }
+    }
+
+    /**
+     * Method to fetch child connections count from childConnections Queue
+     *
+     * @return int count
+     */
+    @VisibleForTesting
+    public int getChildConnectionsCount() {
+        if (childConnections != null) {
+            return childConnections.size();
+        }
+        return 0;
+    }
+
     public Sampler<?> getSampler() {
         return this.sampler;
     }