You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2020/06/26 15:14:41 UTC

[hadoop] branch trunk updated: YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.

This is an automated email from the ASF dual-hosted git repository.

brahma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c03524  YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.
2c03524 is described below

commit 2c03524fa4be754aa95889d4ac0f5d57dca8cda8
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Fri Jun 26 20:43:27 2020 +0530

    YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.
---
 .../store/impl/SQLFederationStateStore.java        | 124 ++++++++++-----------
 .../metrics/FederationStateStoreClientMetrics.java |  18 +++
 .../store/utils/FederationStateStoreUtils.java     |  14 +++
 .../store/impl/FederationStateStoreBaseTest.java   |  15 ++-
 .../store/impl/HSQLDBFederationStateStore.java     |   3 +-
 .../store/impl/TestSQLFederationStateStore.java    |  28 +++++
 .../impl/TestZookeeperFederationStateStore.java    |   4 +-
 7 files changed, 130 insertions(+), 76 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 07dc7e4..8ceef43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.zaxxer.hikari.HikariDataSource;
 
 /**
@@ -141,6 +142,8 @@ public class SQLFederationStateStore implements FederationStateStore {
   private int maximumPoolSize;
   private HikariDataSource dataSource = null;
   private final Clock clock = new MonotonicClock();
+  @VisibleForTesting
+  Connection conn = null;
 
   @Override
   public void init(Configuration conf) throws YarnException {
@@ -173,6 +176,13 @@ public class SQLFederationStateStore implements FederationStateStore {
     dataSource.setMaximumPoolSize(maximumPoolSize);
     LOG.info("Initialized connection pool to the Federation StateStore "
         + "database at address: " + url);
+    try {
+      conn = getConnection();
+      LOG.debug("Connection created");
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Not able to get Connection", e);
+    }
   }
 
   @Override
@@ -185,15 +195,13 @@ public class SQLFederationStateStore implements FederationStateStore {
         .validate(registerSubClusterRequest);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterInfo subClusterInfo =
         registerSubClusterRequest.getSubClusterInfo();
     SubClusterId subClusterId = subClusterInfo.getSubClusterId();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, subClusterId.getId());
@@ -238,9 +246,10 @@ public class SQLFederationStateStore implements FederationStateStore {
               + " into the StateStore",
           e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
+
     return SubClusterRegisterResponse.newInstance();
   }
 
@@ -254,14 +263,12 @@ public class SQLFederationStateStore implements FederationStateStore {
         .validate(subClusterDeregisterRequest);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
     SubClusterState state = subClusterDeregisterRequest.getState();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, subClusterId.getId());
@@ -299,8 +306,8 @@ public class SQLFederationStateStore implements FederationStateStore {
               + state.toString(),
           e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return SubClusterDeregisterResponse.newInstance();
   }
@@ -315,14 +322,12 @@ public class SQLFederationStateStore implements FederationStateStore {
         .validate(subClusterHeartbeatRequest);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
     SubClusterState state = subClusterHeartbeatRequest.getState();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT);
+      cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, subClusterId.getId());
@@ -362,8 +367,8 @@ public class SQLFederationStateStore implements FederationStateStore {
               + subClusterId,
           e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return SubClusterHeartbeatResponse.newInstance();
   }
@@ -376,14 +381,12 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterInfo subClusterInfo = null;
     SubClusterId subClusterId = subClusterRequest.getSubClusterId();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER);
       cstmt.setString(1, subClusterId.getId());
 
       // Set the parameters for the stored procedure
@@ -443,8 +446,8 @@ public class SQLFederationStateStore implements FederationStateStore {
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the SubCluster information for " + subClusterId, e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return GetSubClusterInfoResponse.newInstance(subClusterInfo);
   }
@@ -453,13 +456,11 @@ public class SQLFederationStateStore implements FederationStateStore {
   public GetSubClustersInfoResponse getSubClusters(
       GetSubClustersInfoRequest subClustersRequest) throws YarnException {
     CallableStatement cstmt = null;
-    Connection conn = null;
     ResultSet rs = null;
     List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
+      cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
 
       // Execute the query
       long startTime = clock.getTime();
@@ -510,8 +511,8 @@ public class SQLFederationStateStore implements FederationStateStore {
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the SubClusters ", e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
     }
     return GetSubClustersInfoResponse.newInstance(subClusters);
   }
@@ -524,7 +525,6 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     String subClusterHome = null;
     ApplicationId appId =
@@ -533,8 +533,7 @@ public class SQLFederationStateStore implements FederationStateStore {
         request.getApplicationHomeSubCluster().getHomeSubCluster();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, appId.toString());
@@ -596,8 +595,8 @@ public class SQLFederationStateStore implements FederationStateStore {
                   + request.getApplicationHomeSubCluster().getApplicationId(),
               e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return AddApplicationHomeSubClusterResponse
         .newInstance(SubClusterId.newInstance(subClusterHome));
@@ -611,7 +610,6 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
@@ -619,8 +617,7 @@ public class SQLFederationStateStore implements FederationStateStore {
         request.getApplicationHomeSubCluster().getHomeSubCluster();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, appId.toString());
@@ -660,8 +657,8 @@ public class SQLFederationStateStore implements FederationStateStore {
                   + request.getApplicationHomeSubCluster().getApplicationId(),
               e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return UpdateApplicationHomeSubClusterResponse.newInstance();
   }
@@ -673,13 +670,11 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterId homeRM = null;
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, request.getApplicationId().toString());
@@ -711,9 +706,8 @@ public class SQLFederationStateStore implements FederationStateStore {
               + "for the specified application " + request.getApplicationId(),
           e);
     } finally {
-
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return GetApplicationHomeSubClusterResponse
         .newInstance(ApplicationHomeSubCluster
@@ -724,14 +718,12 @@ public class SQLFederationStateStore implements FederationStateStore {
   public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
       GetApplicationsHomeSubClusterRequest request) throws YarnException {
     CallableStatement cstmt = null;
-    Connection conn = null;
     ResultSet rs = null;
     List<ApplicationHomeSubCluster> appsHomeSubClusters =
         new ArrayList<ApplicationHomeSubCluster>();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
 
       // Execute the query
       long startTime = clock.getTime();
@@ -757,8 +749,8 @@ public class SQLFederationStateStore implements FederationStateStore {
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the applications ", e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
     }
     return GetApplicationsHomeSubClusterResponse
         .newInstance(appsHomeSubClusters);
@@ -772,11 +764,9 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
+      cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, request.getApplicationId().toString());
@@ -812,8 +802,8 @@ public class SQLFederationStateStore implements FederationStateStore {
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to delete the application " + request.getApplicationId(), e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return DeleteApplicationHomeSubClusterResponse.newInstance();
   }
@@ -826,12 +816,10 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationPolicyStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
     SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION);
+      cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, request.getQueue());
@@ -864,8 +852,8 @@ public class SQLFederationStateStore implements FederationStateStore {
           "Unable to select the policy for the queue :" + request.getQueue(),
           e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return GetSubClusterPolicyConfigurationResponse
         .newInstance(subClusterPolicyConfiguration);
@@ -879,13 +867,11 @@ public class SQLFederationStateStore implements FederationStateStore {
     FederationPolicyStoreInputValidator.validate(request);
 
     CallableStatement cstmt = null;
-    Connection conn = null;
 
     SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION);
+      cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
 
       // Set the parameters for the stored procedure
       cstmt.setString(1, policyConf.getQueue());
@@ -925,8 +911,8 @@ public class SQLFederationStateStore implements FederationStateStore {
               + policyConf.getQueue(),
           e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
     }
     return SetSubClusterPolicyConfigurationResponse.newInstance();
   }
@@ -936,14 +922,12 @@ public class SQLFederationStateStore implements FederationStateStore {
       GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
 
     CallableStatement cstmt = null;
-    Connection conn = null;
     ResultSet rs = null;
     List<SubClusterPolicyConfiguration> policyConfigurations =
         new ArrayList<SubClusterPolicyConfiguration>();
 
     try {
-      conn = getConnection();
-      cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
+      cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
 
       // Execute the query
       long startTime = clock.getTime();
@@ -971,8 +955,8 @@ public class SQLFederationStateStore implements FederationStateStore {
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the policy information for all the queues.", e);
     } finally {
-      // Return to the pool the CallableStatement and the Connection
-      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
     }
 
     return GetSubClusterPoliciesConfigurationsResponse
@@ -993,6 +977,8 @@ public class SQLFederationStateStore implements FederationStateStore {
   public void close() throws Exception {
     if (dataSource != null) {
       dataSource.close();
+      LOG.debug("Connection closed");
+      FederationStateStoreClientMetrics.decrConnections();
     }
   }
 
@@ -1003,9 +989,15 @@ public class SQLFederationStateStore implements FederationStateStore {
    * @throws SQLException on failure
    */
   public Connection getConnection() throws SQLException {
+    FederationStateStoreClientMetrics.incrConnections();
     return dataSource.getConnection();
   }
 
+  private CallableStatement getCallableStatement(String procedure)
+      throws SQLException {
+    return conn.prepareCall(procedure);
+  }
+
   private static byte[] getByteArray(ByteBuffer bb) {
     byte[] ba = new byte[bb.limit()];
     bb.get(ba);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
index 27b46cd..d04f850 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -80,6 +81,9 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
   @Metric("Total number of failed StateStore calls")
   private static MutableCounterLong totalFailedCalls;
 
+  @Metric("Total number of Connections")
+  private static MutableGaugeInt totalConnections;
+
   // This after the static members are initialized, or the constructor will
   // throw a NullPointerException
   private static final FederationStateStoreClientMetrics S_INSTANCE =
@@ -146,6 +150,14 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
     methodQuantileMetric.add(duration);
   }
 
+  public static void incrConnections() {
+    totalConnections.incr();
+  }
+
+  public static void decrConnections() {
+    totalConnections.decr();
+  }
+
   @Override
   public void getMetrics(MetricsCollector collector, boolean all) {
     REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
@@ -181,4 +193,10 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
   static double getLatencySucceededCalls() {
     return totalSucceededCalls.lastStat().mean();
   }
+
+  @VisibleForTesting
+  public static int getNumConnections() {
+    return totalConnections.value();
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
index 3b870de..27a4f7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +70,7 @@ public final class FederationStateStoreUtils {
     if (conn != null) {
       try {
         conn.close();
+        FederationStateStoreClientMetrics.decrConnections();
       } catch (SQLException e) {
         logAndThrowException(log, "Exception while trying to close Connection",
             e);
@@ -99,6 +101,18 @@ public final class FederationStateStoreUtils {
   }
 
   /**
+   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
+   *
+   * @param log the logger interface
+   * @param cstmt the interface used to execute SQL stored procedures
+   * @throws YarnException on failure
+   */
+  public static void returnToPool(Logger log, CallableStatement cstmt)
+      throws YarnException {
+    returnToPool(log, cstmt, null);
+  }
+
+  /**
    * Throws an exception due to an error in <code>FederationStateStore</code>.
    *
    * @param log the logger interface
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index b17f870..d0e6485 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -68,7 +68,7 @@ import org.junit.Test;
 public abstract class FederationStateStoreBaseTest {
 
   private static final MonotonicClock CLOCK = new MonotonicClock();
-  private FederationStateStore stateStore = createStateStore();
+  private FederationStateStore stateStore;
 
   protected abstract FederationStateStore createStateStore();
 
@@ -76,6 +76,7 @@ public abstract class FederationStateStoreBaseTest {
 
   @Before
   public void before() throws IOException, YarnException {
+    stateStore = createStateStore();
     stateStore.init(conf);
   }
 
@@ -516,7 +517,7 @@ public abstract class FederationStateStoreBaseTest {
 
   // Convenience methods
 
-  private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
+  SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
 
     String amRMAddress = "1.2.3.4:1";
     String clientRMAddress = "1.2.3.4:2";
@@ -535,7 +536,7 @@ public abstract class FederationStateStoreBaseTest {
     return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
   }
 
-  private void addApplicationHomeSC(ApplicationId appId,
+  void addApplicationHomeSC(ApplicationId appId,
       SubClusterId subClusterId) throws YarnException {
     ApplicationHomeSubCluster ahsc =
         ApplicationHomeSubCluster.newInstance(appId, subClusterId);
@@ -558,14 +559,14 @@ public abstract class FederationStateStoreBaseTest {
         SubClusterRegisterRequest.newInstance(subClusterInfo));
   }
 
-  private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
+  SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
       throws YarnException {
     GetSubClusterInfoRequest request =
         GetSubClusterInfoRequest.newInstance(subClusterId);
     return stateStore.getSubCluster(request).getSubClusterInfo();
   }
 
-  private SubClusterId queryApplicationHomeSC(ApplicationId appId)
+  SubClusterId queryApplicationHomeSC(ApplicationId appId)
       throws YarnException {
     GetApplicationHomeSubClusterRequest request =
         GetApplicationHomeSubClusterRequest.newInstance(appId);
@@ -594,4 +595,8 @@ public abstract class FederationStateStoreBaseTest {
     return conf;
   }
 
+  protected FederationStateStore getStateStore() {
+    return stateStore;
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
index 289a3a6..c3d0a9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -209,7 +209,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
       LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
     }
     try {
-      conn = getConnection();
+      conn = super.conn;
 
       LOG.info("Database Init: Start");
 
@@ -234,7 +234,6 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
       conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
 
       LOG.info("Database Init: Complete");
-      conn.close();
     } catch (SQLException e) {
       LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
index d4e6cc5..3c1d327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -17,8 +17,16 @@
 
 package org.apache.hadoop.yarn.server.federation.store.impl;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * Unit tests for SQLFederationStateStore.
@@ -46,4 +54,24 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
     super.setConf(conf);
     return new HSQLDBFederationStateStore();
   }
+
+  @Test
+  public void testSqlConnectionsCreatedCount() throws YarnException {
+    FederationStateStore stateStore = getStateStore();
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+
+    SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+
+    stateStore.registerSubCluster(
+        SubClusterRegisterRequest.newInstance(subClusterInfo));
+    Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
+
+    addApplicationHomeSC(appId, subClusterId);
+    Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
+
+    // Verify if connection is created only once at statestore init
+    Assert.assertEquals(1,
+        FederationStateStoreClientMetrics.getNumConnections());
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
index 390b803..fe28641 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -72,7 +72,6 @@ public class TestZookeeperFederationStateStore
   @After
   public void after() throws Exception {
     super.after();
-
     curatorFramework.close();
     try {
       curatorTestingServer.stop();
@@ -82,8 +81,7 @@ public class TestZookeeperFederationStateStore
 
   @Override
   protected FederationStateStore createStateStore() {
-    Configuration conf = new Configuration();
-    super.setConf(conf);
+    super.setConf(getConf());
     return new ZookeeperFederationStateStore();
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org