You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2020/06/26 15:14:41 UTC
[hadoop] branch trunk updated: YARN-6526. Refactoring
SQLFederationStateStore by avoiding to recreate a connection at every call.
COntributed by Bilwa S T.
This is an automated email from the ASF dual-hosted git repository.
brahma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c03524 YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.
2c03524 is described below
commit 2c03524fa4be754aa95889d4ac0f5d57dca8cda8
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Fri Jun 26 20:43:27 2020 +0530
YARN-6526. Refactoring SQLFederationStateStore by avoiding to recreate a connection at every call. COntributed by Bilwa S T.
---
.../store/impl/SQLFederationStateStore.java | 124 ++++++++++-----------
.../metrics/FederationStateStoreClientMetrics.java | 18 +++
.../store/utils/FederationStateStoreUtils.java | 14 +++
.../store/impl/FederationStateStoreBaseTest.java | 15 ++-
.../store/impl/HSQLDBFederationStateStore.java | 3 +-
.../store/impl/TestSQLFederationStateStore.java | 28 +++++
.../impl/TestZookeeperFederationStateStore.java | 4 +-
7 files changed, 130 insertions(+), 76 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 07dc7e4..8ceef43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.zaxxer.hikari.HikariDataSource;
/**
@@ -141,6 +142,8 @@ public class SQLFederationStateStore implements FederationStateStore {
private int maximumPoolSize;
private HikariDataSource dataSource = null;
private final Clock clock = new MonotonicClock();
+ @VisibleForTesting
+ Connection conn = null;
@Override
public void init(Configuration conf) throws YarnException {
@@ -173,6 +176,13 @@ public class SQLFederationStateStore implements FederationStateStore {
dataSource.setMaximumPoolSize(maximumPoolSize);
LOG.info("Initialized connection pool to the Federation StateStore "
+ "database at address: " + url);
+ try {
+ conn = getConnection();
+ LOG.debug("Connection created");
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Not able to get Connection", e);
+ }
}
@Override
@@ -185,15 +195,13 @@ public class SQLFederationStateStore implements FederationStateStore {
.validate(registerSubClusterRequest);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterInfo subClusterInfo =
registerSubClusterRequest.getSubClusterInfo();
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
@@ -238,9 +246,10 @@ public class SQLFederationStateStore implements FederationStateStore {
+ " into the StateStore",
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
+
return SubClusterRegisterResponse.newInstance();
}
@@ -254,14 +263,12 @@ public class SQLFederationStateStore implements FederationStateStore {
.validate(subClusterDeregisterRequest);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
SubClusterState state = subClusterDeregisterRequest.getState();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
@@ -299,8 +306,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ state.toString(),
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SubClusterDeregisterResponse.newInstance();
}
@@ -315,14 +322,12 @@ public class SQLFederationStateStore implements FederationStateStore {
.validate(subClusterHeartbeatRequest);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
SubClusterState state = subClusterHeartbeatRequest.getState();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT);
+ cstmt = getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
// Set the parameters for the stored procedure
cstmt.setString(1, subClusterId.getId());
@@ -362,8 +367,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ subClusterId,
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SubClusterHeartbeatResponse.newInstance();
}
@@ -376,14 +381,12 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = subClusterRequest.getSubClusterId();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTER);
cstmt.setString(1, subClusterId.getId());
// Set the parameters for the stored procedure
@@ -443,8 +446,8 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the SubCluster information for " + subClusterId, e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetSubClusterInfoResponse.newInstance(subClusterInfo);
}
@@ -453,13 +456,11 @@ public class SQLFederationStateStore implements FederationStateStore {
public GetSubClustersInfoResponse getSubClusters(
GetSubClustersInfoRequest subClustersRequest) throws YarnException {
CallableStatement cstmt = null;
- Connection conn = null;
ResultSet rs = null;
List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
+ cstmt = getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
// Execute the query
long startTime = clock.getTime();
@@ -510,8 +511,8 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the SubClusters ", e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetSubClustersInfoResponse.newInstance(subClusters);
}
@@ -524,7 +525,6 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
String subClusterHome = null;
ApplicationId appId =
@@ -533,8 +533,7 @@ public class SQLFederationStateStore implements FederationStateStore {
request.getApplicationHomeSubCluster().getHomeSubCluster();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
@@ -596,8 +595,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return AddApplicationHomeSubClusterResponse
.newInstance(SubClusterId.newInstance(subClusterHome));
@@ -611,7 +610,6 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
@@ -619,8 +617,7 @@ public class SQLFederationStateStore implements FederationStateStore {
request.getApplicationHomeSubCluster().getHomeSubCluster();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, appId.toString());
@@ -660,8 +657,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ request.getApplicationHomeSubCluster().getApplicationId(),
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@@ -673,13 +670,11 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterId homeRM = null;
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
@@ -711,9 +706,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ "for the specified application " + request.getApplicationId(),
e);
} finally {
-
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetApplicationHomeSubClusterResponse
.newInstance(ApplicationHomeSubCluster
@@ -724,14 +718,12 @@ public class SQLFederationStateStore implements FederationStateStore {
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
CallableStatement cstmt = null;
- Connection conn = null;
ResultSet rs = null;
List<ApplicationHomeSubCluster> appsHomeSubClusters =
new ArrayList<ApplicationHomeSubCluster>();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
// Execute the query
long startTime = clock.getTime();
@@ -757,8 +749,8 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the information for all the applications ", e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetApplicationsHomeSubClusterResponse
.newInstance(appsHomeSubClusters);
@@ -772,11 +764,9 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
+ cstmt = getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getApplicationId().toString());
@@ -812,8 +802,8 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to delete the application " + request.getApplicationId(), e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return DeleteApplicationHomeSubClusterResponse.newInstance();
}
@@ -826,12 +816,10 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationPolicyStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION);
+ cstmt = getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, request.getQueue());
@@ -864,8 +852,8 @@ public class SQLFederationStateStore implements FederationStateStore {
"Unable to select the policy for the queue :" + request.getQueue(),
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return GetSubClusterPolicyConfigurationResponse
.newInstance(subClusterPolicyConfiguration);
@@ -879,13 +867,11 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationPolicyStoreInputValidator.validate(request);
CallableStatement cstmt = null;
- Connection conn = null;
SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION);
+ cstmt = getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
// Set the parameters for the stored procedure
cstmt.setString(1, policyConf.getQueue());
@@ -925,8 +911,8 @@ public class SQLFederationStateStore implements FederationStateStore {
+ policyConf.getQueue(),
e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt);
}
return SetSubClusterPolicyConfigurationResponse.newInstance();
}
@@ -936,14 +922,12 @@ public class SQLFederationStateStore implements FederationStateStore {
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
CallableStatement cstmt = null;
- Connection conn = null;
ResultSet rs = null;
List<SubClusterPolicyConfiguration> policyConfigurations =
new ArrayList<SubClusterPolicyConfiguration>();
try {
- conn = getConnection();
- cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
+ cstmt = getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
// Execute the query
long startTime = clock.getTime();
@@ -971,8 +955,8 @@ public class SQLFederationStateStore implements FederationStateStore {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Unable to obtain the policy information for all the queues.", e);
} finally {
- // Return to the pool the CallableStatement and the Connection
- FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ // Return to the pool the CallableStatement
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetSubClusterPoliciesConfigurationsResponse
@@ -993,6 +977,8 @@ public class SQLFederationStateStore implements FederationStateStore {
public void close() throws Exception {
if (dataSource != null) {
dataSource.close();
+ LOG.debug("Connection closed");
+ FederationStateStoreClientMetrics.decrConnections();
}
}
@@ -1003,9 +989,15 @@ public class SQLFederationStateStore implements FederationStateStore {
* @throws SQLException on failure
*/
public Connection getConnection() throws SQLException {
+ FederationStateStoreClientMetrics.incrConnections();
return dataSource.getConnection();
}
+ private CallableStatement getCallableStatement(String procedure)
+ throws SQLException {
+ return conn.prepareCall(procedure);
+ }
+
private static byte[] getByteArray(ByteBuffer bb) {
byte[] ba = new byte[bb.limit()];
bb.get(ba);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
index 27b46cd..d04f850 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -80,6 +81,9 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
@Metric("Total number of failed StateStore calls")
private static MutableCounterLong totalFailedCalls;
+ @Metric("Total number of Connections")
+ private static MutableGaugeInt totalConnections;
+
// This after the static members are initialized, or the constructor will
// throw a NullPointerException
private static final FederationStateStoreClientMetrics S_INSTANCE =
@@ -146,6 +150,14 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
methodQuantileMetric.add(duration);
}
+ public static void incrConnections() {
+ totalConnections.incr();
+ }
+
+ public static void decrConnections() {
+ totalConnections.decr();
+ }
+
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
@@ -181,4 +193,10 @@ public final class FederationStateStoreClientMetrics implements MetricsSource {
static double getLatencySucceededCalls() {
return totalSucceededCalls.lastStat().mean();
}
+
+ @VisibleForTesting
+ public static int getNumConnections() {
+ return totalConnections.value();
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
index 3b870de..27a4f7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public final class FederationStateStoreUtils {
if (conn != null) {
try {
conn.close();
+ FederationStateStoreClientMetrics.decrConnections();
} catch (SQLException e) {
logAndThrowException(log, "Exception while trying to close Connection",
e);
@@ -99,6 +101,18 @@ public final class FederationStateStoreUtils {
}
/**
+ * Returns the SQL <code>FederationStateStore</code> connections to the pool.
+ *
+ * @param log the logger interface
+ * @param cstmt the interface used to execute SQL stored procedures
+ * @throws YarnException on failure
+ */
+ public static void returnToPool(Logger log, CallableStatement cstmt)
+ throws YarnException {
+ returnToPool(log, cstmt, null);
+ }
+
+ /**
* Throws an exception due to an error in <code>FederationStateStore</code>.
*
* @param log the logger interface
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index b17f870..d0e6485 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -68,7 +68,7 @@ import org.junit.Test;
public abstract class FederationStateStoreBaseTest {
private static final MonotonicClock CLOCK = new MonotonicClock();
- private FederationStateStore stateStore = createStateStore();
+ private FederationStateStore stateStore;
protected abstract FederationStateStore createStateStore();
@@ -76,6 +76,7 @@ public abstract class FederationStateStoreBaseTest {
@Before
public void before() throws IOException, YarnException {
+ stateStore = createStateStore();
stateStore.init(conf);
}
@@ -516,7 +517,7 @@ public abstract class FederationStateStoreBaseTest {
// Convenience methods
- private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
+ SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
String clientRMAddress = "1.2.3.4:2";
@@ -535,7 +536,7 @@ public abstract class FederationStateStoreBaseTest {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
}
- private void addApplicationHomeSC(ApplicationId appId,
+ void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
@@ -558,14 +559,14 @@ public abstract class FederationStateStoreBaseTest {
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
- private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
+ SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
return stateStore.getSubCluster(request).getSubClusterInfo();
}
- private SubClusterId queryApplicationHomeSC(ApplicationId appId)
+ SubClusterId queryApplicationHomeSC(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
@@ -594,4 +595,8 @@ public abstract class FederationStateStoreBaseTest {
return conf;
}
+ protected FederationStateStore getStateStore() {
+ return stateStore;
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
index 289a3a6..c3d0a9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -209,7 +209,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
}
try {
- conn = getConnection();
+ conn = super.conn;
LOG.info("Database Init: Start");
@@ -234,7 +234,6 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore {
conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
LOG.info("Database Init: Complete");
- conn.close();
} catch (SQLException e) {
LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
index d4e6cc5..3c1d327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -17,8 +17,16 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.junit.Assert;
+import org.junit.Test;
/**
* Unit tests for SQLFederationStateStore.
@@ -46,4 +54,24 @@ public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
super.setConf(conf);
return new HSQLDBFederationStateStore();
}
+
+ @Test
+ public void testSqlConnectionsCreatedCount() throws YarnException {
+ FederationStateStore stateStore = getStateStore();
+ SubClusterId subClusterId = SubClusterId.newInstance("SC");
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+
+ SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
+
+ addApplicationHomeSC(appId, subClusterId);
+ Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
+
+ // Verify if connection is created only once at statestore init
+ Assert.assertEquals(1,
+ FederationStateStoreClientMetrics.getNumConnections());
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
index 390b803..fe28641 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -72,7 +72,6 @@ public class TestZookeeperFederationStateStore
@After
public void after() throws Exception {
super.after();
-
curatorFramework.close();
try {
curatorTestingServer.stop();
@@ -82,8 +81,7 @@ public class TestZookeeperFederationStateStore
@Override
protected FederationStateStore createStateStore() {
- Configuration conf = new Configuration();
- super.setConf(conf);
+ super.setConf(getConf());
return new ZookeeperFederationStateStore();
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org