You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/05/08 23:54:17 UTC
[49/50] [abbrv] hadoop git commit: YARN-3663. Federation State and
Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 80b00ef..db04592 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Calendar;
import java.util.List;
+import java.util.TimeZone;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -87,13 +88,26 @@ public abstract class FederationStateStoreBaseTest {
@Test
public void testRegisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
+
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+ long previousTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
SubClusterRegisterResponse result = stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
+ long currentTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
Assert.assertNotNull(result);
Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
+
+ // The saved heartbeat is between the old one and the current timestamp
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() <= currentTimeStamp);
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() >= previousTimeStamp);
}
@Test
@@ -120,9 +134,7 @@ public abstract class FederationStateStoreBaseTest {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
}
}
@@ -149,9 +161,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(
+ e.getMessage().startsWith("SubCluster SC does not exist"));
}
}
@@ -200,13 +211,24 @@ public abstract class FederationStateStoreBaseTest {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
registerSubCluster(createSubClusterInfo(subClusterId));
+ long previousHeartBeat =
+ querySubClusterInfo(subClusterId).getLastHeartBeat();
+
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
stateStore.subClusterHeartbeat(heartbeatRequest);
+ long currentTimeStamp =
+ Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
Assert.assertEquals(SubClusterState.SC_RUNNING,
querySubClusterInfo(subClusterId).getState());
- Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat());
+
+ // The saved heartbeat is between the old one and the current timestamp
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() <= currentTimeStamp);
+ Assert.assertTrue(querySubClusterInfo(subClusterId)
+ .getLastHeartBeat() >= previousHeartBeat);
}
@Test
@@ -219,9 +241,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("SubCluster SC does not exist; cannot heartbeat"));
}
}
@@ -281,9 +302,8 @@ public abstract class FederationStateStoreBaseTest {
queryApplicationHomeSC(appId);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId + " does not exist"));
}
}
@@ -298,8 +318,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -331,9 +351,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
- e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -397,8 +416,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode());
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
}
}
@@ -458,8 +477,8 @@ public abstract class FederationStateStoreBaseTest {
stateStore.getPolicyConfiguration(request);
Assert.fail();
} catch (FederationStateStoreException e) {
- Assert.assertEquals(
- FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode());
+ Assert.assertTrue(
+ e.getMessage().startsWith("Policy for queue Queue does not exist"));
}
}
@@ -499,8 +518,9 @@ public abstract class FederationStateStoreBaseTest {
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
- return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
- ByteBuffer.allocate(1));
+ ByteBuffer bb = ByteBuffer.allocate(100);
+ bb.put((byte) 0x02);
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
}
private void addApplicationHomeSC(ApplicationId appId,
@@ -558,4 +578,8 @@ public abstract class FederationStateStoreBaseTest {
this.conf = conf;
}
+ protected Configuration getConf() {
+ return conf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
new file mode 100644
index 0000000..289a3a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HSQLDB implementation of {@link FederationStateStore}.
+ */
+public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
+
+ private Connection conn;
+
+ private static final String TABLE_APPLICATIONSHOMESUBCLUSTER =
+ " CREATE TABLE applicationsHomeSubCluster ("
+ + " applicationId varchar(64) NOT NULL,"
+ + " homeSubCluster varchar(256) NOT NULL,"
+ + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
+
+ private static final String TABLE_MEMBERSHIP =
+ "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL,"
+ + " amRMServiceAddress varchar(256) NOT NULL,"
+ + " clientRMServiceAddress varchar(256) NOT NULL,"
+ + " rmAdminServiceAddress varchar(256) NOT NULL,"
+ + " rmWebServiceAddress varchar(256) NOT NULL,"
+ + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL,"
+ + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL,"
+ + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))";
+
+ private static final String TABLE_POLICIES =
+ "CREATE TABLE policies ( queue varchar(256) NOT NULL,"
+ + " policyType varchar(256) NOT NULL, params varbinary(512),"
+ + " CONSTRAINT pk_queue PRIMARY KEY (queue))";
+
+ private static final String SP_REGISTERSUBCLUSTER =
+ "CREATE PROCEDURE sp_registerSubCluster("
+ + " IN subClusterId_IN varchar(256),"
+ + " IN amRMServiceAddress_IN varchar(256),"
+ + " IN clientRMServiceAddress_IN varchar(256),"
+ + " IN rmAdminServiceAddress_IN varchar(256),"
+ + " IN rmWebServiceAddress_IN varchar(256),"
+ + " IN state_IN varchar(256),"
+ + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000),"
+ + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);"
+ + " INSERT INTO membership ( subClusterId,"
+ + " amRMServiceAddress, clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress,"
+ + " lastHeartBeat, state, lastStartTime,"
+ + " capability) VALUES ( subClusterId_IN,"
+ + " amRMServiceAddress_IN, clientRMServiceAddress_IN,"
+ + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN,"
+ + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE,"
+ + " state_IN, lastStartTime_IN, capability_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_DEREGISTERSUBCLUSTER =
+ "CREATE PROCEDURE sp_deregisterSubCluster("
+ + " IN subClusterId_IN varchar(256),"
+ + " IN state_IN varchar(64), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " UPDATE membership SET state = state_IN WHERE ("
+ + " subClusterId = subClusterId_IN AND state != state_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_SUBCLUSTERHEARTBEAT =
+ "CREATE PROCEDURE sp_subClusterHeartbeat("
+ + " IN subClusterId_IN varchar(256), IN state_IN varchar(64),"
+ + " IN capability_IN varchar(6000), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership"
+ + " SET capability = capability_IN, state = state_IN,"
+ + " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'"
+ + " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETSUBCLUSTER =
+ "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256),"
+ + " OUT amRMServiceAddress_OUT varchar(256),"
+ + " OUT clientRMServiceAddress_OUT varchar(256),"
+ + " OUT rmAdminServiceAddress_OUT varchar(256),"
+ + " OUT rmWebServiceAddress_OUT varchar(256),"
+ + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64),"
+ + " OUT lastStartTime_OUT bigint,"
+ + " OUT capability_OUT varchar(6000))"
+ + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress,"
+ + " clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress,"
+ + " lastHeartBeat, state, lastStartTime, capability"
+ + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT,"
+ + " rmAdminServiceAddress_OUT,"
+ + " rmWebServiceAddress_OUT, lastHeartBeat_OUT,"
+ + " state_OUT, lastStartTime_OUT, capability_OUT"
+ + " FROM membership WHERE subClusterId = subClusterId_IN; END";
+
+ private static final String SP_GETSUBCLUSTERS =
+ "CREATE PROCEDURE sp_getSubClusters()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress,"
+ + " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat,"
+ + " state, lastStartTime, capability"
+ + " FROM membership; OPEN result; END";
+
+ private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " IN homeSubCluster_IN varchar(256),"
+ + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " INSERT INTO applicationsHomeSubCluster "
+ + " (applicationId,homeSubCluster) "
+ + " (SELECT applicationId_IN, homeSubCluster_IN"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationId_IN"
+ + " HAVING COUNT(*) = 0 );"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+ + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationID_IN; END";
+
+ private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " UPDATE applicationsHomeSubCluster"
+ + " SET homeSubCluster = homeSubCluster_IN"
+ + " WHERE applicationId = applicationId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64),"
+ + " OUT homeSubCluster_OUT varchar(256))"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " SELECT homeSubCluster INTO homeSubCluster_OUT"
+ + " FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationID_IN; END";
+
+ private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT applicationId, homeSubCluster"
+ + " FROM applicationsHomeSubCluster; OPEN result; END";
+
+ private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
+ "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
+ + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM applicationsHomeSubCluster"
+ + " WHERE applicationId = applicationId_IN;"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_SETPOLICYCONFIGURATION =
+ "CREATE PROCEDURE sp_setPolicyConfiguration("
+ + " IN queue_IN varchar(256), IN policyType_IN varchar(256),"
+ + " IN params_IN varbinary(512), OUT rowCount_OUT int)"
+ + " MODIFIES SQL DATA BEGIN ATOMIC"
+ + " DELETE FROM policies WHERE queue = queue_IN;"
+ + " INSERT INTO policies (queue, policyType, params)"
+ + " VALUES (queue_IN, policyType_IN, params_IN);"
+ + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+ private static final String SP_GETPOLICYCONFIGURATION =
+ "CREATE PROCEDURE sp_getPolicyConfiguration("
+ + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256),"
+ + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC"
+ + " SELECT policyType, params INTO policyType_OUT, params_OUT"
+ + " FROM policies WHERE queue = queue_IN; END";
+
+ private static final String SP_GETPOLICIESCONFIGURATIONS =
+ "CREATE PROCEDURE sp_getPoliciesConfigurations()"
+ + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ + " DECLARE result CURSOR FOR"
+ + " SELECT * FROM policies; OPEN result; END";
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ super.init(conf);
+ } catch (YarnException e1) {
+ LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
+ }
+ try {
+ conn = getConnection();
+
+ LOG.info("Database Init: Start");
+
+ conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
+ conn.prepareStatement(TABLE_MEMBERSHIP).execute();
+ conn.prepareStatement(TABLE_POLICIES).execute();
+
+ conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
+ conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
+ conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute();
+ conn.prepareStatement(SP_GETSUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETSUBCLUSTERS).execute();
+
+ conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute();
+ conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute();
+
+ conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute();
+ conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
+ conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
+
+ LOG.info("Database Init: Complete");
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
+ }
+ }
+
+ public void closeConnection() {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.error(
+ "ERROR: failed to close connection to HSQLDB DB " + e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 64adab8..c29fc03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -28,7 +28,8 @@ public class TestMemoryFederationStateStore
@Override
protected FederationStateStore createStateStore() {
- super.setConf(new Configuration());
+ Configuration conf = new Configuration();
+ super.setConf(conf);
return new MemoryFederationStateStore();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
new file mode 100644
index 0000000..d4e6cc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+
+/**
+ * Unit tests for SQLFederationStateStore.
+ */
+public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
+
+ private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
+ private static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
+ private static final String DATABASE_USERNAME = "SA";
+ private static final String DATABASE_PASSWORD = "";
+
+ @Override
+ protected FederationStateStore createStateStore() {
+
+ YarnConfiguration conf = new YarnConfiguration();
+
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
+ HSQLDB_DRIVER);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME,
+ DATABASE_USERNAME);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD,
+ DATABASE_PASSWORD);
+ conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
+ DATABASE_URL + System.currentTimeMillis());
+ super.setConf(conf);
+ return new HSQLDBFederationStateStore();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
index 8ac5e81..5a5703e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
@@ -145,7 +145,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -155,7 +155,7 @@ public class TestFederationStateStoreInputValidator {
try {
SubClusterRegisterRequest request = null;
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -170,7 +170,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -188,7 +188,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -206,7 +206,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -224,7 +224,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -242,7 +242,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -257,7 +257,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -276,7 +276,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -294,7 +294,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -315,7 +315,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -332,7 +332,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -350,7 +350,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -368,7 +368,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -386,7 +386,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -404,7 +404,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -421,7 +421,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -438,7 +438,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -460,7 +460,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -477,7 +477,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -494,7 +494,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -510,7 +510,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -526,7 +526,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -543,7 +543,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -560,7 +560,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -576,7 +576,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterRegisterRequest request =
SubClusterRegisterRequest.newInstance(subClusterInfo);
FederationMembershipStateStoreInputValidator
- .validateSubClusterRegisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -594,7 +594,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterDeregisterRequest request =
SubClusterDeregisterRequest.newInstance(subClusterId, stateLost);
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -604,7 +604,7 @@ public class TestFederationStateStoreInputValidator {
try {
SubClusterDeregisterRequest request = null;
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -618,7 +618,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterDeregisterRequest request =
SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost);
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -632,7 +632,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterDeregisterRequest request = SubClusterDeregisterRequest
.newInstance(subClusterIdInvalid, stateLost);
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -646,7 +646,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterDeregisterRequest request =
SubClusterDeregisterRequest.newInstance(subClusterId, stateNull);
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -660,7 +660,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterDeregisterRequest request =
SubClusterDeregisterRequest.newInstance(subClusterId, stateNew);
FederationMembershipStateStoreInputValidator
- .validateSubClusterDeregisterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -677,7 +677,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
.newInstance(subClusterId, lastHeartBeat, stateLost, capability);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -687,7 +687,7 @@ public class TestFederationStateStoreInputValidator {
try {
SubClusterHeartbeatRequest request = null;
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -701,7 +701,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
.newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -716,7 +716,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid,
lastHeartBeat, stateLost, capability);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -730,7 +730,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
.newInstance(subClusterId, lastHeartBeat, stateNull, capability);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -745,7 +745,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest.newInstance(subClusterId,
lastHeartBeatNegative, stateLost, capability);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -759,7 +759,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
.newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -773,7 +773,7 @@ public class TestFederationStateStoreInputValidator {
SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
.newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty);
FederationMembershipStateStoreInputValidator
- .validateSubClusterHeartbeatRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -791,7 +791,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
FederationMembershipStateStoreInputValidator
- .validateGetSubClusterInfoRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -801,7 +801,7 @@ public class TestFederationStateStoreInputValidator {
try {
GetSubClusterInfoRequest request = null;
FederationMembershipStateStoreInputValidator
- .validateGetSubClusterInfoRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -815,7 +815,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterIdNull);
FederationMembershipStateStoreInputValidator
- .validateGetSubClusterInfoRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -829,7 +829,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterIdInvalid);
FederationMembershipStateStoreInputValidator
- .validateGetSubClusterInfoRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -850,7 +850,7 @@ public class TestFederationStateStoreInputValidator {
AddApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -860,7 +860,7 @@ public class TestFederationStateStoreInputValidator {
try {
AddApplicationHomeSubClusterRequest request = null;
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -875,7 +875,7 @@ public class TestFederationStateStoreInputValidator {
AddApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(
@@ -891,7 +891,7 @@ public class TestFederationStateStoreInputValidator {
AddApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -908,7 +908,7 @@ public class TestFederationStateStoreInputValidator {
AddApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -925,7 +925,7 @@ public class TestFederationStateStoreInputValidator {
AddApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateAddApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -944,7 +944,7 @@ public class TestFederationStateStoreInputValidator {
UpdateApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -954,7 +954,7 @@ public class TestFederationStateStoreInputValidator {
try {
UpdateApplicationHomeSubClusterRequest request = null;
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -969,7 +969,7 @@ public class TestFederationStateStoreInputValidator {
UpdateApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(
@@ -985,7 +985,7 @@ public class TestFederationStateStoreInputValidator {
UpdateApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -1002,7 +1002,7 @@ public class TestFederationStateStoreInputValidator {
UpdateApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
LOG.info(e.getMessage());
@@ -1019,7 +1019,7 @@ public class TestFederationStateStoreInputValidator {
UpdateApplicationHomeSubClusterRequest
.newInstance(applicationHomeSubCluster);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateUpdateApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1035,7 +1035,7 @@ public class TestFederationStateStoreInputValidator {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateGetApplicationHomeSubClusterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -1045,7 +1045,7 @@ public class TestFederationStateStoreInputValidator {
try {
GetApplicationHomeSubClusterRequest request = null;
FederationApplicationHomeSubClusterStoreInputValidator
- .validateGetApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -1058,7 +1058,7 @@ public class TestFederationStateStoreInputValidator {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appIdNull);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateGetApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1075,7 +1075,7 @@ public class TestFederationStateStoreInputValidator {
DeleteApplicationHomeSubClusterRequest request =
DeleteApplicationHomeSubClusterRequest.newInstance(appId);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateDeleteApplicationHomeSubClusterRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -1085,7 +1085,7 @@ public class TestFederationStateStoreInputValidator {
try {
DeleteApplicationHomeSubClusterRequest request = null;
FederationApplicationHomeSubClusterStoreInputValidator
- .validateDeleteApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -1098,7 +1098,7 @@ public class TestFederationStateStoreInputValidator {
DeleteApplicationHomeSubClusterRequest request =
DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull);
FederationApplicationHomeSubClusterStoreInputValidator
- .validateDeleteApplicationHomeSubClusterRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1115,7 +1115,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
FederationPolicyStoreInputValidator
- .validateGetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -1125,7 +1125,7 @@ public class TestFederationStateStoreInputValidator {
try {
GetSubClusterPolicyConfigurationRequest request = null;
FederationPolicyStoreInputValidator
- .validateGetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -1138,7 +1138,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queueNull);
FederationPolicyStoreInputValidator
- .validateGetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1150,7 +1150,7 @@ public class TestFederationStateStoreInputValidator {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty);
FederationPolicyStoreInputValidator
- .validateGetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1169,7 +1169,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
} catch (FederationStateStoreInvalidInputException e) {
Assert.fail(e.getMessage());
}
@@ -1179,7 +1179,7 @@ public class TestFederationStateStoreInputValidator {
try {
SetSubClusterPolicyConfigurationRequest request = null;
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage()
@@ -1193,7 +1193,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(
@@ -1208,7 +1208,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1222,7 +1222,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1236,7 +1236,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));
@@ -1250,7 +1250,7 @@ public class TestFederationStateStoreInputValidator {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest.newInstance(policy);
FederationPolicyStoreInputValidator
- .validateSetSubClusterPolicyConfigurationRequest(request);
+ .validate(request);
Assert.fail();
} catch (FederationStateStoreInvalidInputException e) {
Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
index 632e865..304910e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
@@ -82,10 +81,8 @@ public class TestFederationStateStoreFacadeRetry {
conf = new Configuration();
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
- RetryAction action = policy.shouldRetry(
- new FederationStateStoreException(
- FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL),
- 0, 0, false);
+ RetryAction action = policy
+ .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false);
Assert.assertEquals(RetryAction.FAIL.action, action.action);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql
new file mode 100644
index 0000000..66d6f0e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+USE [FederationStateStore]
+GO
+
+IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_addApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster]
+ @applicationId VARCHAR(64),
+ @homeSubCluster VARCHAR(256),
+ @storedHomeSubCluster VARCHAR(256) OUTPUT,
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+ -- If application to sub-cluster map doesn't exist, insert it.
+ -- Otherwise don't change the current mapping.
+ IF NOT EXISTS (SELECT TOP 1 *
+ FROM [dbo].[applicationsHomeSubCluster]
+ WHERE [applicationId] = @applicationId)
+
+ INSERT INTO [dbo].[applicationsHomeSubCluster] (
+ [applicationId],
+ [homeSubCluster])
+ VALUES (
+ @applicationId,
+ @homeSubCluster);
+ -- End of the IF block
+
+ SELECT @rowCount = @@ROWCOUNT;
+
+ SELECT @storedHomeSubCluster = [homeSubCluster]
+ FROM [dbo].[applicationsHomeSubCluster]
+ WHERE [applicationId] = @applicationId;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_updateApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster]
+ @applicationId VARCHAR(64),
+ @homeSubCluster VARCHAR(256),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ UPDATE [dbo].[applicationsHomeSubCluster]
+ SET [homeSubCluster] = @homeSubCluster
+ WHERE [applicationId] = @applicationid;
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getApplicationsHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ SELECT [applicationId], [homeSubCluster], [createTime]
+ FROM [dbo].[applicationsHomeSubCluster]
+ END TRY
+
+ BEGIN CATCH
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster]
+ @applicationId VARCHAR(64),
+ @homeSubCluster VARCHAR(256) OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+
+ SELECT @homeSubCluster = [homeSubCluster]
+ FROM [dbo].[applicationsHomeSubCluster]
+ WHERE [applicationId] = @applicationid;
+
+ END TRY
+
+ BEGIN CATCH
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_deleteApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster]
+ @applicationId VARCHAR(64),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ DELETE FROM [dbo].[applicationsHomeSubCluster]
+ WHERE [applicationId] = @applicationId;
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_registerSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_registerSubCluster]
+ @subClusterId VARCHAR(256),
+ @amRMServiceAddress VARCHAR(256),
+ @clientRMServiceAddress VARCHAR(256),
+ @rmAdminServiceAddress VARCHAR(256),
+ @rmWebServiceAddress VARCHAR(256),
+ @state VARCHAR(32),
+ @lastStartTime BIGINT,
+ @capability VARCHAR(6000),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ DELETE FROM [dbo].[membership]
+ WHERE [subClusterId] = @subClusterId;
+ INSERT INTO [dbo].[membership] (
+ [subClusterId],
+ [amRMServiceAddress],
+ [clientRMServiceAddress],
+ [rmAdminServiceAddress],
+ [rmWebServiceAddress],
+ [lastHeartBeat],
+ [state],
+ [lastStartTime],
+ [capability] )
+ VALUES (
+ @subClusterId,
+ @amRMServiceAddress,
+ @clientRMServiceAddress,
+ @rmAdminServiceAddress,
+ @rmWebServiceAddress,
+ GETUTCDATE(),
+ @state,
+ @lastStartTime,
+ @capability);
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getSubClusters]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getSubClusters];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getSubClusters]
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ SELECT [subClusterId], [amRMServiceAddress], [clientRMServiceAddress],
+ [rmAdminServiceAddress], [rmWebServiceAddress], [lastHeartBeat],
+ [state], [lastStartTime], [capability]
+ FROM [dbo].[membership]
+ END TRY
+
+ BEGIN CATCH
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getSubCluster]
+ @subClusterId VARCHAR(256),
+ @amRMServiceAddress VARCHAR(256) OUTPUT,
+ @clientRMServiceAddress VARCHAR(256) OUTPUT,
+ @rmAdminServiceAddress VARCHAR(256) OUTPUT,
+ @rmWebServiceAddress VARCHAR(256) OUTPUT,
+ @lastHeartbeat DATETIME2 OUTPUT,
+ @state VARCHAR(256) OUTPUT,
+ @lastStartTime BIGINT OUTPUT,
+ @capability VARCHAR(6000) OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ SELECT @subClusterId = [subClusterId],
+ @amRMServiceAddress = [amRMServiceAddress],
+ @clientRMServiceAddress = [clientRMServiceAddress],
+ @rmAdminServiceAddress = [rmAdminServiceAddress],
+ @rmWebServiceAddress = [rmWebServiceAddress],
+ @lastHeartBeat = [lastHeartBeat],
+ @state = [state],
+ @lastStartTime = [lastStartTime],
+ @capability = [capability]
+ FROM [dbo].[membership]
+ WHERE [subClusterId] = @subClusterId
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+
+IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_subClusterHeartbeat];
+GO
+
+CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat]
+ @subClusterId VARCHAR(256),
+ @state VARCHAR(256),
+ @capability VARCHAR(6000),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ UPDATE [dbo].[membership]
+ SET [state] = @state,
+ [lastHeartbeat] = GETUTCDATE(),
+ [capability] = @capability
+ WHERE [subClusterId] = @subClusterId;
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_deregisterSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_deregisterSubCluster]
+ @subClusterId VARCHAR(256),
+ @state VARCHAR(256),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ UPDATE [dbo].[membership]
+ SET [state] = @state
+ WHERE [subClusterId] = @subClusterId;
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_setPolicyConfiguration];
+GO
+
+CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration]
+ @queue VARCHAR(256),
+ @policyType VARCHAR(256),
+ @params VARBINARY(512),
+ @rowCount int OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ BEGIN TRAN
+
+ DELETE FROM [dbo].[policies]
+ WHERE [queue] = @queue;
+ INSERT INTO [dbo].[policies] (
+ [queue],
+ [policyType],
+ [params])
+ VALUES (
+ @queue,
+ @policyType,
+ @params);
+ SELECT @rowCount = @@ROWCOUNT;
+
+ COMMIT TRAN
+ END TRY
+
+ BEGIN CATCH
+ ROLLBACK TRAN
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getPolicyConfiguration];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration]
+ @queue VARCHAR(256),
+ @policyType VARCHAR(256) OUTPUT,
+ @params VARBINARY(6000) OUTPUT
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+
+ SELECT @policyType = [policyType],
+ @params = [params]
+ FROM [dbo].[policies]
+ WHERE [queue] = @queue
+
+ END TRY
+
+ BEGIN CATCH
+
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getPoliciesConfigurations]', 'P' ) IS NOT NULL
+ DROP PROCEDURE [sp_getPoliciesConfigurations];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getPoliciesConfigurations]
+AS BEGIN
+ DECLARE @errorMessage nvarchar(4000)
+
+ BEGIN TRY
+ SELECT [queue], [policyType], [params] FROM [dbo].[policies]
+ END TRY
+
+ BEGIN CATCH
+ SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+ /* raise error and terminate the execution */
+ RAISERROR(@errorMessage, --- Error Message
+ 1, -- Severity
+ -1 -- State
+ ) WITH log
+ END CATCH
+END;
+GO
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/027444cb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql
new file mode 100644
index 0000000..a97385b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+USE [FederationStateStore]
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+ WHERE name = 'applicationsHomeSubCluster'
+ AND schema_id = SCHEMA_ID('dbo'))
+ BEGIN
+ PRINT 'Table applicationsHomeSubCluster does not exist, create it...'
+
+ SET ANSI_NULLS ON
+
+ SET QUOTED_IDENTIFIER ON
+
+ SET ANSI_PADDING ON
+
+ CREATE TABLE [dbo].[applicationsHomeSubCluster](
+ applicationId VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL,
+ homeSubCluster VARCHAR(256) NOT NULL,
+ createTime DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(),
+
+ CONSTRAINT [pk_applicationId] PRIMARY KEY
+ (
+ [applicationId]
+ )
+ )
+
+ SET ANSI_PADDING OFF
+
+ PRINT 'Table applicationsHomeSubCluster created.'
+ END
+ELSE
+ PRINT 'Table applicationsHomeSubCluster exists, no operation required...'
+ GO
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+ WHERE name = 'membership'
+ AND schema_id = SCHEMA_ID('dbo'))
+ BEGIN
+ PRINT 'Table membership does not exist, create it...'
+
+ SET ANSI_NULLS ON
+
+ SET QUOTED_IDENTIFIER ON
+
+ SET ANSI_PADDING ON
+
+ CREATE TABLE [dbo].[membership](
+ [subClusterId] VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL,
+ [amRMServiceAddress] VARCHAR(256) NOT NULL,
+ [clientRMServiceAddress] VARCHAR(256) NOT NULL,
+ [rmAdminServiceAddress] VARCHAR(256) NOT NULL,
+ [rmWebServiceAddress] VARCHAR(256) NOT NULL,
+ [lastHeartBeat] DATETIME2 NOT NULL,
+ [state] VARCHAR(32) NOT NULL,
+ [lastStartTime] BIGINT NOT NULL,
+ [capability] VARCHAR(6000) NOT NULL,
+
+ CONSTRAINT [pk_subClusterId] PRIMARY KEY
+ (
+ [subClusterId]
+ )
+ )
+
+ SET ANSI_PADDING OFF
+
+ PRINT 'Table membership created.'
+ END
+ELSE
+ PRINT 'Table membership exists, no operation required...'
+ GO
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+ WHERE name = 'policies'
+ AND schema_id = SCHEMA_ID('dbo'))
+ BEGIN
+ PRINT 'Table policies does not exist, create it...'
+
+ SET ANSI_NULLS ON
+
+ SET QUOTED_IDENTIFIER ON
+
+ SET ANSI_PADDING ON
+
+ CREATE TABLE [dbo].[policies](
+ queue VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL,
+ policyType VARCHAR(256) NOT NULL,
+ params VARBINARY(6000) NOT NULL,
+
+ CONSTRAINT [pk_queue] PRIMARY KEY
+ (
+ [queue]
+ )
+ )
+
+ SET ANSI_PADDING OFF
+
+ PRINT 'Table policies created.'
+ END
+ELSE
+ PRINT 'Table policies exists, no operation required...'
+ GO
+GO
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org