You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/05/16 16:11:00 UTC
[43/50] [abbrv] hadoop git commit: YARN-3663. Federation State and
Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino).
YARN-3663. Federation State and Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88164e20
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88164e20
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88164e20
Branch: refs/heads/YARN-2915
Commit: 88164e20560dcbd8e6c0388c69b251dc92beeadb
Parents: 305810d
Author: Carlo Curino <cu...@apache.org>
Authored: Tue Apr 25 15:14:02 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue May 16 08:52:39 2017 -0700
----------------------------------------------------------------------
LICENSE.txt | 1 +
hadoop-project/pom.xml | 12 +
.../hadoop/yarn/conf/YarnConfiguration.java | 23 +
.../yarn/conf/TestYarnConfigurationFields.java | 14 +
.../hadoop-yarn-server-common/pom.xml | 20 +
.../FederationStateStoreErrorCode.java | 105 ---
.../FederationStateStoreException.java | 17 +-
.../store/impl/MemoryFederationStateStore.java | 81 +-
.../store/impl/SQLFederationStateStore.java | 937 +++++++++++++++++++
.../store/records/SubClusterState.java | 21 +
...cationHomeSubClusterStoreInputValidator.java | 12 +-
...ationMembershipStateStoreInputValidator.java | 14 +-
.../FederationPolicyStoreInputValidator.java | 6 +-
.../store/utils/FederationStateStoreUtils.java | 109 ++-
.../impl/FederationStateStoreBaseTest.java | 74 +-
.../store/impl/HSQLDBFederationStateStore.java | 252 +++++
.../impl/TestMemoryFederationStateStore.java | 3 +-
.../store/impl/TestSQLFederationStateStore.java | 49 +
.../TestFederationStateStoreInputValidator.java | 146 +--
.../TestFederationStateStoreFacadeRetry.java | 7 +-
.../FederationStateStoreStoreProcs.sql | 511 ++++++++++
.../SQLServer/FederationStateStoreTables.sql | 122 +++
22 files changed, 2228 insertions(+), 308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/LICENSE.txt
----------------------------------------------------------------------
diff --git a/LICENSE.txt b/LICENSE.txt
index 969708f..1582f6c 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -661,6 +661,7 @@ hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/static/jquery-1.10.2.min.js
hadoop-tools/hadoop-sls/src/main/html/js/thirdparty/jquery.js
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/jquery
Apache HBase - Server which contains JQuery minified javascript library version 1.8.3
+Microsoft SQLServer - JDBC version 6.1.0.jre7
--------------------------------------------------------------------------------
Copyright 2005, 2012, 2013 jQuery Foundation and other contributors, https://jquery.org/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index cd92fa5..ac121c3 100755
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -100,6 +100,8 @@
<jcache.version>1.0.0</jcache.version>
<ehcache.version>3.0.3</ehcache.version>
+ <hikari.version>2.4.11</hikari.version>
+ <mssql.version>6.1.0.jre7</mssql.version>
<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
@@ -1280,6 +1282,16 @@
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP-java7</artifactId>
+ <version>${hikari.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <version>${mssql.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4ba8c85..7139394 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2592,6 +2592,29 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
+ private static final String FEDERATION_STATESTORE_SQL_PREFIX =
+ FEDERATION_PREFIX + "state-store.sql.";
+
+ public static final String FEDERATION_STATESTORE_SQL_USERNAME =
+ FEDERATION_STATESTORE_SQL_PREFIX + "username";
+
+ public static final String FEDERATION_STATESTORE_SQL_PASSWORD =
+ FEDERATION_STATESTORE_SQL_PREFIX + "password";
+
+ public static final String FEDERATION_STATESTORE_SQL_URL =
+ FEDERATION_STATESTORE_SQL_PREFIX + "url";
+
+ public static final String FEDERATION_STATESTORE_SQL_JDBC_CLASS =
+ FEDERATION_STATESTORE_SQL_PREFIX + "jdbc-class";
+
+ public static final String DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS =
+ "org.hsqldb.jdbc.JDBCDataSource";
+
+ public static final String FEDERATION_STATESTORE_SQL_MAXCONNECTIONS =
+ FEDERATION_STATESTORE_SQL_PREFIX + "max-connections";
+
+ public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 6e33c0a..c3cb78d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -90,6 +90,20 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+ // Federation StateStore SQL implementation configs to be ignored
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS);
+
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 5ae8889..3bf1b88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -116,6 +116,26 @@
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP-java7</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-keyvault</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
deleted file mode 100644
index 88e2d3a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.store.exception;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * <p>
- * Logical error codes from <code>FederationStateStore</code>.
- * </p>
- */
-@Public
-@Unstable
-public enum FederationStateStoreErrorCode {
-
- MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."),
-
- MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."),
-
- MEMBERSHIP_SINGLE_SELECT_FAIL(1103,
- "Fail to select a tuple from Membership table."),
-
- MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104,
- "Fail to select multiple tuples from Membership table."),
-
- MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105,
- "Fail to update/deregister a tuple in Membership table."),
-
- MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106,
- "Fail to update/heartbeat a tuple in Membership table."),
-
- APPLICATIONS_INSERT_FAIL(1201,
- "Fail to insert a tuple into ApplicationsHomeSubCluster table."),
-
- APPLICATIONS_DELETE_FAIL(1202,
- "Fail to delete a tuple from ApplicationsHomeSubCluster table"),
-
- APPLICATIONS_SINGLE_SELECT_FAIL(1203,
- "Fail to select a tuple from ApplicationsHomeSubCluster table."),
-
- APPLICATIONS_MULTIPLE_SELECT_FAIL(1204,
- "Fail to select multiple tuple from ApplicationsHomeSubCluster table."),
-
- APPLICATIONS_UPDATE_FAIL(1205,
- "Fail to update a tuple in ApplicationsHomeSubCluster table."),
-
- POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."),
-
- POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."),
-
- POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."),
-
- POLICY_MULTIPLE_SELECT_FAIL(1304,
- "Fail to select multiple tuples from Policy table."),
-
- POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table.");
-
- private final int id;
- private final String msg;
-
- FederationStateStoreErrorCode(int id, String msg) {
- this.id = id;
- this.msg = msg;
- }
-
- /**
- * Get the error code related to the FederationStateStore failure.
- *
- * @return the error code related to the FederationStateStore failure.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Get the error message related to the FederationStateStore failure.
- *
- * @return the error message related to the FederationStateStore failure.
- */
- public String getMsg() {
- return this.msg;
- }
-
- @Override
- public String toString() {
- return "\nError Code: " + this.id + "\nError Message: " + this.msg;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
index 81a9e99..1013ec6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
@@ -31,15 +31,20 @@ public class FederationStateStoreException extends YarnException {
*/
private static final long serialVersionUID = -6453353714832159296L;
- private FederationStateStoreErrorCode code;
-
- public FederationStateStoreException(FederationStateStoreErrorCode code) {
+ public FederationStateStoreException() {
super();
- this.code = code;
}
- public FederationStateStoreErrorCode getCode() {
- return code;
+ public FederationStateStoreException(String message) {
+ super(message);
+ }
+
+ public FederationStateStoreException(Throwable cause) {
+ super(cause);
+ }
+
+ public FederationStateStoreException(String message, Throwable cause) {
+ super(message, cause);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 127bf82..fbdb7bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -18,21 +18,17 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -52,8 +48,13 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfo
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
@@ -98,16 +99,18 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
- FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
+ long currentTime =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
SubClusterInfo subClusterInfoToSave =
SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
subClusterInfo.getAMRMServiceAddress(),
subClusterInfo.getClientRMServiceAddress(),
subClusterInfo.getRMAdminServiceAddress(),
- subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
+ subClusterInfo.getRMWebServiceAddress(), currentTime,
subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
subClusterInfo.getCapability());
@@ -118,15 +121,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public SubClusterDeregisterResponse deregisterSubCluster(
SubClusterDeregisterRequest request) throws YarnException {
- FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
String errMsg =
"SubCluster " + request.getSubClusterId().toString() + " not found";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
- errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
} else {
subClusterInfo.setState(request.getState());
}
@@ -138,20 +138,20 @@ public class MemoryFederationStateStore implements FederationStateStore {
public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterHeartbeatRequest request) throws YarnException {
- FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
- String errMsg = "Subcluster " + subClusterId.toString()
+ String errMsg = "SubCluster " + subClusterId.toString()
+ " does not exist; cannot heartbeat";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
- errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
- subClusterInfo.setLastHeartBeat(clock.getTime());
+ long currentTime =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
+ subClusterInfo.setLastHeartBeat(currentTime);
subClusterInfo.setState(request.getState());
subClusterInfo.setCapability(request.getCapability());
@@ -162,14 +162,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
public GetSubClusterInfoResponse getSubCluster(
GetSubClusterInfoRequest request) throws YarnException {
- FederationMembershipStateStoreInputValidator
- .validateGetSubClusterInfoRequest(request);
+ FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
String errMsg =
- "Subcluster " + subClusterId.toString() + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg);
+ "SubCluster " + subClusterId.toString() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@@ -195,8 +193,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest request) throws YarnException {
- FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
@@ -213,14 +210,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
- FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
applications.put(appId,
@@ -232,14 +227,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest request) throws YarnException {
- FederationApplicationHomeSubClusterStoreInputValidator
- .validateGetApplicationHomeSubClusterRequest(request);
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
- errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return GetApplicationHomeSubClusterResponse.newInstance(
@@ -264,13 +256,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
- FederationApplicationHomeSubClusterStoreInputValidator
- .validateDeleteApplicationHomeSubClusterRequest(request);
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
applications.remove(appId);
@@ -281,13 +271,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest request) throws YarnException {
- FederationPolicyStoreInputValidator
- .validateGetSubClusterPolicyConfigurationRequest(request);
+ FederationPolicyStoreInputValidator.validate(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
String errMsg = "Policy for queue " + queue + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG,
- FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
return GetSubClusterPolicyConfigurationResponse
@@ -298,8 +286,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest request) throws YarnException {
- FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ FederationPolicyStoreInputValidator.validate(request);
policies.put(request.getPolicyConfiguration().getQueue(),
request.getPolicyConfiguration());
return SetSubClusterPolicyConfigurationResponse.newInstance();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
new file mode 100644
index 0000000..a849c6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -0,0 +1,937 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.nio.ByteBuffer;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariDataSource;
+
+/**
+ * SQL implementation of {@link FederationStateStore}.
+ */
+public class SQLFederationStateStore implements FederationStateStore {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SQLFederationStateStore.class);
+
+ // Stored procedures patterns
+
+ private static final String CALL_SP_REGISTER_SUBCLUSTER =
+ "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
+
+ private static final String CALL_SP_DEREGISTER_SUBCLUSTER =
+ "{call sp_deregisterSubCluster(?, ?, ?)}";
+
+ private static final String CALL_SP_GET_SUBCLUSTER =
+ "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
+
+ private static final String CALL_SP_GET_SUBCLUSTERS =
+ "{call sp_getSubClusters()}";
+
+ private static final String CALL_SP_SUBCLUSTER_HEARTBEAT =
+ "{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
+
+ private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER =
+ "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}";
+
+ private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER =
+ "{call sp_updateApplicationHomeSubCluster(?, ?, ?)}";
+
+ private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER =
+ "{call sp_deleteApplicationHomeSubCluster(?, ?)}";
+
+ private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER =
+ "{call sp_getApplicationHomeSubCluster(?, ?)}";
+
+ private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
+ "{call sp_getApplicationsHomeSubCluster()}";
+
+ private static final String CALL_SP_SET_POLICY_CONFIGURATION =
+ "{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
+
+ private static final String CALL_SP_GET_POLICY_CONFIGURATION =
+ "{call sp_getPolicyConfiguration(?, ?, ?)}";
+
+ private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS =
+ "{call sp_getPoliciesConfigurations()}";
+
+ private Calendar utcCalendar =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+
+ // SQL database configurations
+
+ private String userName;
+ private String password;
+ private String driverClass;
+ private String url;
+ private int maximumPoolSize;
+ private HikariDataSource dataSource = null;
+
+ @Override
+ public void init(Configuration conf) throws YarnException {
+ driverClass =
+ conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+ maximumPoolSize =
+ conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS);
+
+ // An helper method avoids to assign a null value to these property
+ userName = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME);
+ password = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD);
+ url = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL);
+
+ try {
+ Class.forName(driverClass);
+ } catch (ClassNotFoundException e) {
+ FederationStateStoreUtils.logAndThrowException(LOG,
+ "Driver class not found.", e);
+ }
+
+ // Create the data source to pool connections in a thread-safe manner
+ dataSource = new HikariDataSource();
+ dataSource.setDataSourceClassName(driverClass);
+ FederationStateStoreUtils.setUsername(dataSource, userName);
+ FederationStateStoreUtils.setPassword(dataSource, password);
+ FederationStateStoreUtils.setProperty(dataSource,
+ FederationStateStoreUtils.FEDERATION_STORE_URL, url);
+ dataSource.setMaximumPoolSize(maximumPoolSize);
+ LOG.info("Initialized connection pool to the Federation StateStore "
+ + "database at address: " + url);
+ }
+
+ @Override
+ public SubClusterRegisterResponse registerSubCluster(
+ SubClusterRegisterRequest registerSubClusterRequest)
+ throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator
+ .validate(registerSubClusterRequest);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterInfo subClusterInfo =
+ registerSubClusterRequest.getSubClusterInfo();
+ SubClusterId subClusterId = subClusterInfo.getSubClusterId();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, subClusterId.getId());
+ cstmt.setString(2, subClusterInfo.getAMRMServiceAddress());
+ cstmt.setString(3, subClusterInfo.getClientRMServiceAddress());
+ cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress());
+ cstmt.setString(5, subClusterInfo.getRMWebServiceAddress());
+ cstmt.setString(6, subClusterInfo.getState().toString());
+ cstmt.setLong(7, subClusterInfo.getLastStartTime());
+ cstmt.setString(8, subClusterInfo.getCapability());
+ cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not add a new subcluster into FederationStateStore
+ if (cstmt.getInt(9) == 0) {
+ String errMsg = "SubCluster " + subClusterId
+ + " was not registered into the StateStore";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(9) != 1) {
+ String errMsg = "Wrong behavior during registration of SubCluster "
+ + subClusterId + " into the StateStore";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info(
+ "Registered the SubCluster " + subClusterId + " into the StateStore");
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to register the SubCluster " + subClusterId
+ + " into the StateStore",
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return SubClusterRegisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterDeregisterResponse deregisterSubCluster(
+ SubClusterDeregisterRequest subClusterDeregisterRequest)
+ throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator
+ .validate(subClusterDeregisterRequest);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
+ SubClusterState state = subClusterDeregisterRequest.getState();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, subClusterId.getId());
+ cstmt.setString(2, state.toString());
+ cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not deregister the subcluster into FederationStateStore
+ if (cstmt.getInt(3) == 0) {
+ String errMsg = "SubCluster " + subClusterId + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(3) != 1) {
+ String errMsg = "Wrong behavior during deregistration of SubCluster "
+ + subClusterId + " from the StateStore";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+ + state.toString());
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to deregister the sub-cluster " + subClusterId + " state to "
+ + state.toString(),
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return SubClusterDeregisterResponse.newInstance();
+ }
+
+ @Override
+ public SubClusterHeartbeatResponse subClusterHeartbeat(
+ SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+ throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator
+ .validate(subClusterHeartbeatRequest);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
+ SubClusterState state = subClusterHeartbeatRequest.getState();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, subClusterId.getId());
+ cstmt.setString(2, state.toString());
+ cstmt.setString(3, subClusterHeartbeatRequest.getCapability());
+ cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not update the subcluster into FederationStateStore
+ if (cstmt.getInt(4) == 0) {
+ String errMsg = "SubCluster " + subClusterId.toString()
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(4) != 1) {
+ String errMsg =
+ "Wrong behavior during the heartbeat of SubCluster " + subClusterId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info("Heartbeated the StateStore for the specified SubCluster "
+ + subClusterId);
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to heartbeat the StateStore for the specified SubCluster "
+ + subClusterId,
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return SubClusterHeartbeatResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterInfoResponse getSubCluster(
+ GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+
+ // Input validator
+ FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterInfo subClusterInfo = null;
+ SubClusterId subClusterId = subClusterRequest.getSubClusterId();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER);
+ cstmt.setString(1, subClusterId.getId());
+
+ // Set the parameters for the stored procedure
+ cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(4, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(5, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP);
+ cstmt.registerOutParameter(7, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(8, java.sql.Types.BIGINT);
+ cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
+
+ // Execute the query
+ cstmt.execute();
+
+ String amRMAddress = cstmt.getString(2);
+ String clientRMAddress = cstmt.getString(3);
+ String rmAdminAddress = cstmt.getString(4);
+ String webAppAddress = cstmt.getString(5);
+
+ Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar);
+ long lastHeartBeat =
+ heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
+
+ SubClusterState state = SubClusterState.fromString(cstmt.getString(7));
+ long lastStartTime = cstmt.getLong(8);
+ String capability = cstmt.getString(9);
+
+ subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress,
+ clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
+ lastStartTime, capability);
+
+ // Check if the output it is a valid subcluster
+ try {
+ FederationMembershipStateStoreInputValidator
+ .checkSubClusterInfo(subClusterInfo);
+ } catch (FederationStateStoreInvalidInputException e) {
+ String errMsg =
+ "SubCluster " + subClusterId.toString() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got the information about the specified SubCluster "
+ + subClusterInfo.toString());
+ }
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the SubCluster information for " + subClusterId, e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+ }
+
+ @Override
+ public GetSubClustersInfoResponse getSubClusters(
+ GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+ CallableStatement cstmt = null;
+ Connection conn = null;
+ ResultSet rs = null;
+ List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
+
+ // Execute the query
+ rs = cstmt.executeQuery();
+
+ while (rs.next()) {
+
+ // Extract the output for each tuple
+ String subClusterName = rs.getString(1);
+ String amRMAddress = rs.getString(2);
+ String clientRMAddress = rs.getString(3);
+ String rmAdminAddress = rs.getString(4);
+ String webAppAddress = rs.getString(5);
+ long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime();
+ SubClusterState state = SubClusterState.fromString(rs.getString(7));
+ long lastStartTime = rs.getLong(8);
+ String capability = rs.getString(9);
+
+ SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
+ SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+ amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
+ lastHeartBeat, state, lastStartTime, capability);
+
+ // Check if the output it is a valid subcluster
+ try {
+ FederationMembershipStateStoreInputValidator
+ .checkSubClusterInfo(subClusterInfo);
+ } catch (FederationStateStoreInvalidInputException e) {
+ String errMsg =
+ "SubCluster " + subClusterId.toString() + " is not valid";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ // Filter the inactive
+ if (!subClustersRequest.getFilterInactiveSubClusters()
+ || subClusterInfo.getState().isActive()) {
+ subClusters.add(subClusterInfo);
+ }
+ }
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the information for all the SubClusters ", e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ }
+ return GetSubClustersInfoResponse.newInstance(subClusters);
+ }
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ String subClusterHome = null;
+ ApplicationId appId =
+ request.getApplicationHomeSubCluster().getApplicationId();
+ SubClusterId subClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, appId.toString());
+ cstmt.setString(2, subClusterId.getId());
+ cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ subClusterHome = cstmt.getString(3);
+ SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
+
+ // For failover reason, we check the returned SubClusterId.
+ // If it is equal to the subclusterId we sent, the call added the new
+ // application into FederationStateStore. If the call returns a different
+ // SubClusterId it means we already tried to insert this application but a
+ // component (Router/StateStore/RM) failed during the submission.
+ if (subClusterId.equals(subClusterIdHome)) {
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not add a new application into FederationStateStore
+ if (cstmt.getInt(4) == 0) {
+ String errMsg = "The application " + appId
+ + " was not insert into the StateStore";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(4) != 1) {
+ String errMsg = "Wrong behavior during the insertion of SubCluster "
+ + subClusterId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info("Insert into the StateStore the application: " + appId
+ + " in SubCluster: " + subClusterHome);
+ } else {
+ // Check the ROWCOUNT value, if it is different from 0 it means the call
+ // did edited the table
+ if (cstmt.getInt(4) != 0) {
+ String errMsg =
+ "The application " + appId + " does exist but was overwritten";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ LOG.info("Application: " + appId + " already present with SubCluster: "
+ + subClusterHome);
+ }
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils
+ .logAndThrowRetriableException(LOG,
+ "Unable to insert the newly generated application "
+ + request.getApplicationHomeSubCluster().getApplicationId(),
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return AddApplicationHomeSubClusterResponse
+ .newInstance(SubClusterId.newInstance(subClusterHome));
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ ApplicationId appId =
+ request.getApplicationHomeSubCluster().getApplicationId();
+ SubClusterId subClusterId =
+ request.getApplicationHomeSubCluster().getHomeSubCluster();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, appId.toString());
+ cstmt.setString(2, subClusterId.getId());
+ cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not update the application into FederationStateStore
+ if (cstmt.getInt(3) == 0) {
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(3) != 1) {
+ String errMsg =
+ "Wrong behavior during the update of SubCluster " + subClusterId;
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info(
+ "Update the SubCluster to {} for application {} in the StateStore",
+ subClusterId, appId);
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils
+ .logAndThrowRetriableException(LOG,
+ "Unable to update the application "
+ + request.getApplicationHomeSubCluster().getApplicationId(),
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterId homeRM = null;
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, request.getApplicationId().toString());
+ cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+ // Execute the query
+ cstmt.execute();
+
+ if (cstmt.getString(2) != null) {
+ homeRM = SubClusterId.newInstance(cstmt.getString(2));
+ } else {
+ String errMsg =
+ "Application " + request.getApplicationId() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got the information about the specified application "
+ + request.getApplicationId() + ". The AM is running in " + homeRM);
+ }
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the application information "
+ + "for the specified application " + request.getApplicationId(),
+ e);
+ } finally {
+
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return GetApplicationHomeSubClusterResponse
+ .newInstance(ApplicationHomeSubCluster
+ .newInstance(request.getApplicationId(), homeRM));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ CallableStatement cstmt = null;
+ Connection conn = null;
+ ResultSet rs = null;
+ List<ApplicationHomeSubCluster> appsHomeSubClusters =
+ new ArrayList<ApplicationHomeSubCluster>();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
+
+ // Execute the query
+ rs = cstmt.executeQuery();
+
+ while (rs.next()) {
+
+ // Extract the output for each tuple
+ String applicationId = rs.getString(1);
+ String homeSubCluster = rs.getString(2);
+
+ appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
+ ApplicationId.fromString(applicationId),
+ SubClusterId.newInstance(homeSubCluster)));
+ }
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the information for all the applications ", e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ }
+ return GetApplicationsHomeSubClusterResponse
+ .newInstance(appsHomeSubClusters);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+
+ // Input validator
+ FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, request.getApplicationId().toString());
+ cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not delete the application from FederationStateStore
+ if (cstmt.getInt(2) == 0) {
+ String errMsg =
+ "Application " + request.getApplicationId() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(2) != 1) {
+ String errMsg = "Wrong behavior during deleting the application "
+ + request.getApplicationId();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info("Delete from the StateStore the application: {}",
+ request.getApplicationId());
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to delete the application " + request.getApplicationId(), e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ // Input validator
+ FederationPolicyStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+ SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, request.getQueue());
+ cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+ cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check if the output it is a valid policy
+ if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
+ subClusterPolicyConfiguration =
+ SubClusterPolicyConfiguration.newInstance(request.getQueue(),
+ cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3)));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Selected from StateStore the policy for the queue: "
+ + subClusterPolicyConfiguration.toString());
+ }
+ } else {
+ String errMsg =
+ "Policy for queue " + request.getQueue() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to select the policy for the queue :" + request.getQueue(),
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return GetSubClusterPolicyConfigurationResponse
+ .newInstance(subClusterPolicyConfiguration);
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+ // Input validator
+ FederationPolicyStoreInputValidator.validate(request);
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+
+ SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION);
+
+ // Set the parameters for the stored procedure
+ cstmt.setString(1, policyConf.getQueue());
+ cstmt.setString(2, policyConf.getType());
+ cstmt.setBytes(3, getByteArray(policyConf.getParams()));
+ cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+ // Execute the query
+ cstmt.executeUpdate();
+
+ // Check the ROWCOUNT value, if it is equal to 0 it means the call
+ // did not add a new policy into FederationStateStore
+ if (cstmt.getInt(4) == 0) {
+ String errMsg = "The policy " + policyConf.getQueue()
+ + " was not insert into the StateStore";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+ // Check the ROWCOUNT value, if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ if (cstmt.getInt(4) != 1) {
+ String errMsg =
+ "Wrong behavior during insert the policy " + policyConf.getQueue();
+ FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ }
+
+ LOG.info("Insert into the state store the policy for the queue: "
+ + policyConf.getQueue());
+
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to insert the newly generated policy for the queue :"
+ + policyConf.getQueue(),
+ e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+ }
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+
+ CallableStatement cstmt = null;
+ Connection conn = null;
+ ResultSet rs = null;
+ List<SubClusterPolicyConfiguration> policyConfigurations =
+ new ArrayList<SubClusterPolicyConfiguration>();
+
+ try {
+ conn = getConnection();
+ cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
+
+ // Execute the query
+ rs = cstmt.executeQuery();
+
+ while (rs.next()) {
+
+ // Extract the output for each tuple
+ String queue = rs.getString(1);
+ String type = rs.getString(2);
+ byte[] policyInfo = rs.getBytes(3);
+
+ SubClusterPolicyConfiguration subClusterPolicyConfiguration =
+ SubClusterPolicyConfiguration.newInstance(queue, type,
+ ByteBuffer.wrap(policyInfo));
+ policyConfigurations.add(subClusterPolicyConfiguration);
+ }
+ } catch (SQLException e) {
+ FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+ "Unable to obtain the policy information for all the queues.", e);
+ } finally {
+ // Return to the pool the CallableStatement and the Connection
+ FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+ }
+
+ return GetSubClusterPoliciesConfigurationsResponse
+ .newInstance(policyConfigurations);
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Version loadVersion() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (dataSource != null) {
+ dataSource.close();
+ }
+ }
+
+ /**
+ * Get a connection from the DataSource pool.
+ *
+ * @return a connection from the DataSource pool.
+ * @throws SQLException on failure
+ */
+ public Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ private static byte[] getByteArray(ByteBuffer bb) {
+ byte[] ba = new byte[bb.limit()];
+ bb.get(ba);
+ return ba;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java
index ff49aaa..b30bd32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* <p>
@@ -61,4 +63,23 @@ public enum SubClusterState {
return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED
|| this == SC_LOST);
}
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterState.class);
+
+ /**
+ * Convert a string into {@code SubClusterState}.
+ *
+ * @param x the string to convert in SubClusterState
+ * @return the respective {@code SubClusterState}
+ */
+ public static SubClusterState fromString(String x) {
+ try {
+ return SubClusterState.valueOf(x);
+ } catch (Exception e) {
+ LOG.error("Invalid SubCluster State value in the StateStore does not"
+ + " match with the YARN Federation standard.");
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
index d920144..0184c9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
@@ -51,8 +51,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
* against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateAddApplicationHomeSubClusterRequest(
- AddApplicationHomeSubClusterRequest request)
+ public static void validate(AddApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing AddApplicationHomeSubCluster Request."
@@ -75,8 +74,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateUpdateApplicationHomeSubClusterRequest(
- UpdateApplicationHomeSubClusterRequest request)
+ public static void validate(UpdateApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing UpdateApplicationHomeSubCluster Request."
@@ -99,8 +97,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
* against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetApplicationHomeSubClusterRequest(
- GetApplicationHomeSubClusterRequest request)
+ public static void validate(GetApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing GetApplicationHomeSubCluster Request."
@@ -122,8 +119,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateDeleteApplicationHomeSubClusterRequest(
- DeleteApplicationHomeSubClusterRequest request)
+ public static void validate(DeleteApplicationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing DeleteApplicationHomeSubCluster Request."
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index ebe622b..0ec8e5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -53,8 +53,7 @@ public final class FederationMembershipStateStoreInputValidator {
* @param request the {@link SubClusterRegisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterRegisterRequest(
- SubClusterRegisterRequest request)
+ public static void validate(SubClusterRegisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -79,8 +78,7 @@ public final class FederationMembershipStateStoreInputValidator {
* @param request the {@link SubClusterDeregisterRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterDeregisterRequest(
- SubClusterDeregisterRequest request)
+ public static void validate(SubClusterDeregisterRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -111,8 +109,7 @@ public final class FederationMembershipStateStoreInputValidator {
* @param request the {@link SubClusterHeartbeatRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSubClusterHeartbeatRequest(
- SubClusterHeartbeatRequest request)
+ public static void validate(SubClusterHeartbeatRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -143,8 +140,7 @@ public final class FederationMembershipStateStoreInputValidator {
* @param request the {@link GetSubClusterInfoRequest} to validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetSubClusterInfoRequest(
- GetSubClusterInfoRequest request)
+ public static void validate(GetSubClusterInfoRequest request)
throws FederationStateStoreInvalidInputException {
// check if the request is present
@@ -169,7 +165,7 @@ public final class FederationMembershipStateStoreInputValidator {
* @throws FederationStateStoreInvalidInputException if the SubCluster Info
* are invalid
*/
- private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
+ public static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
throws FederationStateStoreInvalidInputException {
if (subClusterInfo == null) {
String message = "Missing SubCluster Information."
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
index 0df2d85..3c68bfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
@@ -48,8 +48,7 @@ public final class FederationPolicyStoreInputValidator {
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateGetSubClusterPolicyConfigurationRequest(
- GetSubClusterPolicyConfigurationRequest request)
+ public static void validate(GetSubClusterPolicyConfigurationRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing GetSubClusterPolicyConfiguration Request."
@@ -72,8 +71,7 @@ public final class FederationPolicyStoreInputValidator {
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
- public static void validateSetSubClusterPolicyConfigurationRequest(
- SetSubClusterPolicyConfigurationRequest request)
+ public static void validate(SetSubClusterPolicyConfigurationRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing SetSubClusterPolicyConfiguration Request."
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88164e20/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
index 7dbb20a..3b870de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -20,16 +20,18 @@ package org.apache.hadoop.yarn.server.federation.store.utils;
import java.sql.CallableStatement;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.zaxxer.hikari.HikariDataSource;
+
/**
* Common utility methods used by the store implementations.
*
@@ -39,19 +41,22 @@ public final class FederationStateStoreUtils {
public static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreUtils.class);
+ public final static String FEDERATION_STORE_URL = "url";
+
private FederationStateStoreUtils() {
}
/**
- * Returns the SQL <code>FederationStateStore</code> connection to the pool.
+ * Returns the SQL <code>FederationStateStore</code> connections to the pool.
*
* @param log the logger interface
* @param cstmt the interface used to execute SQL stored procedures
* @param conn the SQL connection
+ * @param rs the ResultSet interface used to execute SQL stored procedures
* @throws YarnException on failure
*/
public static void returnToPool(Logger log, CallableStatement cstmt,
- Connection conn) throws YarnException {
+ Connection conn, ResultSet rs) throws YarnException {
if (cstmt != null) {
try {
cstmt.close();
@@ -69,6 +74,28 @@ public final class FederationStateStoreUtils {
e);
}
}
+
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close ResultSet",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Returns the SQL <code>FederationStateStore</code> connections to the pool.
+ *
+ * @param log the logger interface
+ * @param cstmt the interface used to execute SQL stored procedures
+ * @param conn the SQL connection
+ * @throws YarnException on failure
+ */
+ public static void returnToPool(Logger log, CallableStatement cstmt,
+ Connection conn) throws YarnException {
+ returnToPool(log, cstmt, conn, null);
}
/**
@@ -95,28 +122,13 @@ public final class FederationStateStoreUtils {
* <code>FederationStateStore</code>.
*
* @param log the logger interface
- * @param code FederationStateStoreErrorCode of the error
* @param errMsg the error message
* @throws YarnException on failure
*/
- public static void logAndThrowStoreException(Logger log,
- FederationStateStoreErrorCode code, String errMsg) throws YarnException {
- log.error(errMsg + " " + code.toString());
- throw new FederationStateStoreException(code);
- }
-
- /**
- * Throws an <code>FederationStateStoreException</code> due to an error in
- * <code>FederationStateStore</code>.
- *
- * @param log the logger interface
- * @param code FederationStateStoreErrorCode of the error
- * @throws YarnException on failure
- */
- public static void logAndThrowStoreException(Logger log,
- FederationStateStoreErrorCode code) throws YarnException {
- log.error(code.toString());
- throw new FederationStateStoreException(code);
+ public static void logAndThrowStoreException(Logger log, String errMsg)
+ throws YarnException {
+ log.error(errMsg);
+ throw new FederationStateStoreException(errMsg);
}
/**
@@ -129,7 +141,7 @@ public final class FederationStateStoreUtils {
*/
public static void logAndThrowInvalidInputException(Logger log, String errMsg)
throws YarnException {
- LOG.error(errMsg);
+ log.error(errMsg);
throw new FederationStateStoreInvalidInputException(errMsg);
}
@@ -145,11 +157,58 @@ public final class FederationStateStoreUtils {
public static void logAndThrowRetriableException(Logger log, String errMsg,
Throwable t) throws YarnException {
if (t != null) {
- LOG.error(errMsg, t);
+ log.error(errMsg, t);
throw new FederationStateStoreRetriableException(errMsg, t);
} else {
- LOG.error(errMsg);
+ log.error(errMsg);
throw new FederationStateStoreRetriableException(errMsg);
}
}
+
+ /**
+ * Sets a specific value for a specific property of
+ * <code>HikariDataSource</code> SQL connections.
+ *
+ * @param dataSource the <code>HikariDataSource</code> connections
+ * @param property the property to set
+ * @param value the value to set
+ */
+ public static void setProperty(HikariDataSource dataSource, String property,
+ String value) {
+ LOG.debug("Setting property {} with value {}", property, value);
+ if (property != null && !property.isEmpty() && value != null) {
+ dataSource.addDataSourceProperty(property, value);
+ }
+ }
+
+ /**
+ * Sets a specific username for <code>HikariDataSource</code> SQL connections.
+ *
+ * @param dataSource the <code>HikariDataSource</code> connections
+ * @param userNameDB the value to set
+ */
+ public static void setUsername(HikariDataSource dataSource,
+ String userNameDB) {
+ if (userNameDB != null) {
+ dataSource.setUsername(userNameDB);
+ LOG.debug("Setting non NULL Username for Store connection");
+ } else {
+ LOG.debug("NULL Username specified for Store connection, so ignoring");
+ }
+ }
+
+ /**
+ * Sets a specific password for <code>HikariDataSource</code> SQL connections.
+ *
+ * @param dataSource the <code>HikariDataSource</code> connections
+ * @param password the value to set
+ */
+ public static void setPassword(HikariDataSource dataSource, String password) {
+ if (password != null) {
+ dataSource.setPassword(password);
+ LOG.debug("Setting non NULL Credentials for Store connection");
+ } else {
+ LOG.debug("NULL Credentials specified for Store connection, so ignoring");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org