You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:41 UTC
[20/50] [abbrv] hadoop git commit: YARN-5602. Utils for Federation
State and Policy Store. (Giovanni Matteo Fumarola via Subru).
YARN-5602. Utils for Federation State and Policy Store. (Giovanni Matteo Fumarola via Subru).
(cherry picked from commit 326a2e6bde1cf266ecc7d3b513cdaac6abcebbe4)
(cherry picked from commit e1da8f0667589dd660e6fcd776cc87f1b8ef6db9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9dcf928
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9dcf928
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9dcf928
Branch: refs/heads/branch-2
Commit: b9dcf9283e29281e55a65b59bd74c21dc6806296
Parents: b40bdaf
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Apr 5 15:02:00 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:25:10 2017 -0700
----------------------------------------------------------------------
.../FederationStateStoreErrorCode.java | 105 +++++++++++++
.../FederationStateStoreException.java | 45 ++++++
...derationStateStoreInvalidInputException.java | 48 ++++++
.../FederationStateStoreRetriableException.java | 44 ++++++
.../store/exception/package-info.java | 17 ++
.../store/impl/MemoryFederationStateStore.java | 56 +++++--
.../store/records/SubClusterInfo.java | 62 ++++++++
.../records/impl/pb/SubClusterInfoPBImpl.java | 16 --
...cationHomeSubClusterStoreInputValidator.java | 1 +
...ationMembershipStateStoreInputValidator.java | 1 +
.../FederationPolicyStoreInputValidator.java | 1 +
...derationStateStoreInvalidInputException.java | 48 ------
.../store/utils/FederationStateStoreUtils.java | 155 +++++++++++++++++++
.../utils/FederationStateStoreFacade.java | 23 ++-
.../impl/FederationStateStoreBaseTest.java | 91 ++++++-----
.../impl/TestMemoryFederationStateStore.java | 4 +-
.../TestFederationStateStoreInputValidator.java | 1 +
.../TestFederationStateStoreFacadeRetry.java | 125 +++++++++++++++
18 files changed, 730 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
new file mode 100644
index 0000000..88e2d3a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * <p>
+ * Logical error codes from <code>FederationStateStore</code>.
+ * </p>
+ */
+@Public
+@Unstable
+public enum FederationStateStoreErrorCode {
+
+ MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."),
+
+ MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."),
+
+ MEMBERSHIP_SINGLE_SELECT_FAIL(1103,
+ "Fail to select a tuple from Membership table."),
+
+ MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104,
+ "Fail to select multiple tuples from Membership table."),
+
+ MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105,
+ "Fail to update/deregister a tuple in Membership table."),
+
+ MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106,
+ "Fail to update/heartbeat a tuple in Membership table."),
+
+ APPLICATIONS_INSERT_FAIL(1201,
+ "Fail to insert a tuple into ApplicationsHomeSubCluster table."),
+
+ APPLICATIONS_DELETE_FAIL(1202,
+ "Fail to delete a tuple from ApplicationsHomeSubCluster table"),
+
+ APPLICATIONS_SINGLE_SELECT_FAIL(1203,
+ "Fail to select a tuple from ApplicationsHomeSubCluster table."),
+
+ APPLICATIONS_MULTIPLE_SELECT_FAIL(1204,
+ "Fail to select multiple tuple from ApplicationsHomeSubCluster table."),
+
+ APPLICATIONS_UPDATE_FAIL(1205,
+ "Fail to update a tuple in ApplicationsHomeSubCluster table."),
+
+ POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."),
+
+ POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."),
+
+ POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."),
+
+ POLICY_MULTIPLE_SELECT_FAIL(1304,
+ "Fail to select multiple tuples from Policy table."),
+
+ POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table.");
+
+ private final int id;
+ private final String msg;
+
+ FederationStateStoreErrorCode(int id, String msg) {
+ this.id = id;
+ this.msg = msg;
+ }
+
+ /**
+ * Get the error code related to the FederationStateStore failure.
+ *
+ * @return the error code related to the FederationStateStore failure.
+ */
+ public int getId() {
+ return this.id;
+ }
+
+ /**
+ * Get the error message related to the FederationStateStore failure.
+ *
+ * @return the error message related to the FederationStateStore failure.
+ */
+ public String getMsg() {
+ return this.msg;
+ }
+
+ @Override
+ public String toString() {
+ return "\nError Code: " + this.id + "\nError Message: " + this.msg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
new file mode 100644
index 0000000..81a9e99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the <code>FederationStateStore</code>.
+ *
+ */
+public class FederationStateStoreException extends YarnException {
+
+ /**
+ * IDE auto-generated.
+ */
+ private static final long serialVersionUID = -6453353714832159296L;
+
+ private FederationStateStoreErrorCode code;
+
+ public FederationStateStoreException(FederationStateStoreErrorCode code) {
+ super();
+ this.code = code;
+ }
+
+ public FederationStateStoreErrorCode getCode() {
+ return code;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java
new file mode 100644
index 0000000..edf7837
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the {@code FederationMembershipStateStoreInputValidator},
+ * {@code FederationApplicationHomeSubClusterStoreInputValidator},
+ * {@code FederationPolicyStoreInputValidator} if the input is invalid.
+ *
+ */
+public class FederationStateStoreInvalidInputException extends YarnException {
+
+ /**
+ * IDE auto-generated.
+ */
+ private static final long serialVersionUID = -7352144682711430801L;
+
+ public FederationStateStoreInvalidInputException(Throwable cause) {
+ super(cause);
+ }
+
+ public FederationStateStoreInvalidInputException(String message) {
+ super(message);
+ }
+
+ public FederationStateStoreInvalidInputException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java
new file mode 100644
index 0000000..19d6750
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreRetriableException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the {@code FederationStateStore}, if it is a retriable
+ * exception.
+ *
+ */
+public class FederationStateStoreRetriableException extends YarnException {
+
+ private static final long serialVersionUID = 1L;
+
+ public FederationStateStoreRetriableException(Throwable cause) {
+ super(cause);
+ }
+
+ public FederationStateStoreRetriableException(String message) {
+ super(message);
+ }
+
+ public FederationStateStoreRetriableException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java
new file mode 100644
index 0000000..727606f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.exception;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 6e564dc..127bf82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@@ -60,8 +61,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationH
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* In-memory implementation of {@link FederationStateStore}.
@@ -74,6 +78,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
private final MonotonicClock clock = new MonotonicClock();
+ public static final Logger LOG =
+ LoggerFactory.getLogger(MemoryFederationStateStore.class);
+
@Override
public void init(Configuration conf) {
membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
@@ -94,7 +101,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationMembershipStateStoreInputValidator
.validateSubClusterRegisterRequest(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
- membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
+
+ SubClusterInfo subClusterInfoToSave =
+ SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
+ subClusterInfo.getAMRMServiceAddress(),
+ subClusterInfo.getClientRMServiceAddress(),
+ subClusterInfo.getRMAdminServiceAddress(),
+ subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
+ subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
+ subClusterInfo.getCapability());
+
+ membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
return SubClusterRegisterResponse.newInstance();
}
@@ -105,8 +122,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
.validateSubClusterDeregisterRequest(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
- throw new YarnException(
- "SubCluster " + request.getSubClusterId().toString() + " not found");
+ String errMsg =
+ "SubCluster " + request.getSubClusterId().toString() + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
+ errMsg);
} else {
subClusterInfo.setState(request.getState());
}
@@ -124,8 +144,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
- throw new YarnException("Subcluster " + subClusterId.toString()
- + " does not exist; cannot heartbeat");
+ String errMsg = "Subcluster " + subClusterId.toString()
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
+ errMsg);
}
subClusterInfo.setLastHeartBeat(clock.getTime());
@@ -143,8 +166,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
.validateGetSubClusterInfoRequest(request);
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
- throw new YarnException(
- "Subcluster " + subClusterId.toString() + " does not exist");
+ String errMsg =
+ "Subcluster " + subClusterId.toString() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg);
}
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@@ -193,7 +218,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg);
}
applications.put(appId,
@@ -209,7 +236,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
.validateGetApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
+ errMsg);
}
return GetApplicationHomeSubClusterResponse.newInstance(
@@ -238,7 +268,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
.validateDeleteApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg);
}
applications.remove(appId);
@@ -253,7 +285,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
.validateGetSubClusterPolicyConfigurationRequest(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
- throw new YarnException("Policy for queue " + queue + " does not exist");
+ String errMsg = "Policy for queue " + queue + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg);
}
return GetSubClusterPolicyConfigurationResponse
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
index f13c8f1..cbf64e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
@@ -260,4 +260,66 @@ public abstract class SubClusterInfo {
+ ", getState() = " + getState() + ", getLastStartTime() = "
+ getLastStartTime() + ", getCapability() = " + getCapability() + "]";
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ SubClusterInfo other = (SubClusterInfo) obj;
+ if (!this.getSubClusterId().equals(other.getSubClusterId())) {
+ return false;
+ }
+ if (!this.getAMRMServiceAddress().equals(other.getAMRMServiceAddress())) {
+ return false;
+ }
+ if (!this.getClientRMServiceAddress()
+ .equals(other.getClientRMServiceAddress())) {
+ return false;
+ }
+ if (!this.getRMAdminServiceAddress()
+ .equals(other.getRMAdminServiceAddress())) {
+ return false;
+ }
+ if (!this.getRMWebServiceAddress().equals(other.getRMWebServiceAddress())) {
+ return false;
+ }
+ if (!this.getState().equals(other.getState())) {
+ return false;
+ }
+ return this.getLastStartTime() == other.getLastStartTime();
+ // Capability and HeartBeat fields are not included as they are temporal
+ // (i.e. timestamps), so they change during the lifetime of the same
+ // sub-cluster
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((getSubClusterId() == null) ? 0 : getSubClusterId().hashCode());
+ result = prime * result + ((getAMRMServiceAddress() == null) ? 0
+ : getAMRMServiceAddress().hashCode());
+ result = prime * result + ((getClientRMServiceAddress() == null) ? 0
+ : getClientRMServiceAddress().hashCode());
+ result = prime * result + ((getRMAdminServiceAddress() == null) ? 0
+ : getRMAdminServiceAddress().hashCode());
+ result = prime * result + ((getRMWebServiceAddress() == null) ? 0
+ : getRMWebServiceAddress().hashCode());
+ result =
+ prime * result + ((getState() == null) ? 0 : getState().hashCode());
+ result = prime * result
+ + (int) (getLastStartTime() ^ (getLastStartTime() >>> 32));
+ return result;
+ // Capability and HeartBeat fields are not included as they are temporal
+ // (i.e. timestamps), so they change during the lifetime of the same
+ // sub-cluster
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
index b650b5f..cfdd038 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
@@ -82,22 +82,6 @@ public class SubClusterInfoPBImpl extends SubClusterInfo {
}
@Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- @Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
index c14a452..d920144 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index ff9d8e9..ebe622b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils;
import java.net.URI;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
index 273a8ac..0df2d85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
deleted file mode 100644
index ea1428d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.store.utils;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-/**
- * Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
- * {@link FederationApplicationHomeSubClusterStoreInputValidator},
- * {@link FederationPolicyStoreInputValidator} if the input is invalid.
- *
- */
-public class FederationStateStoreInvalidInputException extends YarnException {
-
- /**
- * IDE auto-generated.
- */
- private static final long serialVersionUID = -7352144682711430801L;
-
- public FederationStateStoreInvalidInputException(Throwable cause) {
- super(cause);
- }
-
- public FederationStateStoreInvalidInputException(String message) {
- super(message);
- }
-
- public FederationStateStoreInvalidInputException(String message,
- Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
new file mode 100644
index 0000000..7dbb20a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common utility methods used by the store implementations.
+ *
+ */
+public final class FederationStateStoreUtils {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreUtils.class);
+
+ private FederationStateStoreUtils() {
+ }
+
+ /**
+ * Returns the SQL <code>FederationStateStore</code> connection to the pool.
+ *
+ * @param log the logger interface
+ * @param cstmt the interface used to execute SQL stored procedures
+ * @param conn the SQL connection
+ * @throws YarnException on failure
+ */
+ public static void returnToPool(Logger log, CallableStatement cstmt,
+ Connection conn) throws YarnException {
+ if (cstmt != null) {
+ try {
+ cstmt.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close Statement",
+ e);
+ }
+ }
+
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close Connection",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Throws an exception due to an error in <code>FederationStateStore</code>.
+ *
+ * @param log the logger interface
+ * @param errMsg the error message
+ * @param t the throwable raised in the called class.
+ * @throws YarnException on failure
+ */
+ public static void logAndThrowException(Logger log, String errMsg,
+ Throwable t) throws YarnException {
+ if (t != null) {
+ log.error(errMsg, t);
+ throw new YarnException(errMsg, t);
+ } else {
+ log.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+ }
+
+ /**
+ * Throws an <code>FederationStateStoreException</code> due to an error in
+ * <code>FederationStateStore</code>.
+ *
+ * @param log the logger interface
+ * @param code FederationStateStoreErrorCode of the error
+ * @param errMsg the error message
+ * @throws YarnException on failure
+ */
+ public static void logAndThrowStoreException(Logger log,
+ FederationStateStoreErrorCode code, String errMsg) throws YarnException {
+ log.error(errMsg + " " + code.toString());
+ throw new FederationStateStoreException(code);
+ }
+
+ /**
+ * Throws an <code>FederationStateStoreException</code> due to an error in
+ * <code>FederationStateStore</code>.
+ *
+ * @param log the logger interface
+ * @param code FederationStateStoreErrorCode of the error
+ * @throws YarnException on failure
+ */
+ public static void logAndThrowStoreException(Logger log,
+ FederationStateStoreErrorCode code) throws YarnException {
+ log.error(code.toString());
+ throw new FederationStateStoreException(code);
+ }
+
+ /**
+ * Throws an <code>FederationStateStoreInvalidInputException</code> due to an
+ * error in <code>FederationStateStore</code>.
+ *
+ * @param log the logger interface
+ * @param errMsg the error message
+ * @throws YarnException on failure
+ */
+ public static void logAndThrowInvalidInputException(Logger log, String errMsg)
+ throws YarnException {
+ LOG.error(errMsg);
+ throw new FederationStateStoreInvalidInputException(errMsg);
+ }
+
+ /**
+ * Throws an <code>FederationStateStoreRetriableException</code> due to an
+ * error in <code>FederationStateStore</code>.
+ *
+ * @param log the logger interface
+ * @param errMsg the error message
+ * @param t the throwable raised in the called class.
+ * @throws YarnException on failure
+ */
+ public static void logAndThrowRetriableException(Logger log, String errMsg,
+ Throwable t) throws YarnException {
+ if (t != null) {
+ LOG.error(errMsg, t);
+ throw new FederationStateStoreRetriableException(errMsg, t);
+ } else {
+ LOG.error(errMsg);
+ throw new FederationStateStoreRetriableException(errMsg);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index e8f245e..5693342 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -137,14 +138,32 @@ public final class FederationStateStoreFacade {
initCache();
}
+ /**
+ * Create a RetryPolicy for {@code FederationStateStoreFacade}. In case of
+ * failure, it retries for:
+ * <ul>
+ * <li>{@code FederationStateStoreRetriableException}</li>
+ * <li>{@code CacheLoaderException}</li>
+ * </ul>
+ *
+ * @param conf the updated configuration
+ * @return the RetryPolicy for FederationStateStoreFacade
+ */
public static RetryPolicy createRetryPolicy(Configuration conf) {
// Retry settings for StateStore
- RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry(
+ RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
-
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
+ basePolicy);
+ exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
+
+ RetryPolicy retryPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
return retryPolicy;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 63a5b65..80b00ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -67,9 +70,11 @@ public abstract class FederationStateStoreBaseTest {
protected abstract FederationStateStore createStateStore();
+ private Configuration conf;
+
@Before
public void before() throws IOException, YarnException {
- stateStore.init(new Configuration());
+ stateStore.init(conf);
}
@After
@@ -114,8 +119,10 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
+ e.getCode());
}
}
@@ -141,9 +148,10 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Subcluster SC does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL,
+ e.getCode());
}
}
@@ -166,19 +174,25 @@ public abstract class FederationStateStoreBaseTest {
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
- Assert.assertTrue(
- stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
- .getSubClusters().contains(subClusterInfo1));
- Assert.assertFalse(
+ List<SubClusterInfo> subClustersActive =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
- .getSubClusters().contains(subClusterInfo2));
-
- Assert.assertTrue(
+ .getSubClusters();
+ List<SubClusterInfo> subClustersAll =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
- .getSubClusters().contains(subClusterInfo1));
- Assert.assertTrue(
- stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
- .getSubClusters().contains(subClusterInfo2));
+ .getSubClusters();
+
+ // SC1 is the only active
+ Assert.assertEquals(1, subClustersActive.size());
+ SubClusterInfo sc1 = subClustersActive.get(0);
+ Assert.assertEquals(subClusterId1, sc1.getSubClusterId());
+
+ // SC1 and SC2 are the SubCluster present into the StateStore
+
+ Assert.assertEquals(2, subClustersAll.size());
+ Assert.assertTrue(subClustersAll.contains(sc1));
+ subClustersAll.remove(sc1);
+ SubClusterInfo sc2 = subClustersAll.get(0);
+ Assert.assertEquals(subClusterId2, sc2.getSubClusterId());
}
@Test
@@ -204,9 +218,10 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Subcluster SC does not exist; cannot heartbeat"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
+ e.getCode());
}
}
@@ -265,9 +280,10 @@ public abstract class FederationStateStoreBaseTest {
try {
queryApplicationHomeSC(appId);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
+ e.getCode());
}
}
@@ -281,9 +297,9 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode());
}
}
@@ -314,9 +330,10 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
+ e.getCode());
}
}
@@ -379,9 +396,9 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode());
}
}
@@ -440,9 +457,9 @@ public abstract class FederationStateStoreBaseTest {
try {
stateStore.getPolicyConfiguration(request);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Policy for queue Queue does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode());
}
}
@@ -537,4 +554,8 @@ public abstract class FederationStateStoreBaseTest {
return result.getPolicyConfiguration();
}
+ protected void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 74404c7..64adab8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
@@ -27,6 +28,7 @@ public class TestMemoryFederationStateStore
@Override
protected FederationStateStore createStateStore() {
+ super.setConf(new Configuration());
return new MemoryFederationStateStore();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
index b95f17a..8ac5e81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.federation.store.utils;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9dcf928/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
new file mode 100644
index 0000000..632e865
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import javax.cache.integration.CacheLoaderException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to validate FederationStateStoreFacade retry policy.
+ */
+public class TestFederationStateStoreFacadeRetry {
+
+ private int maxRetries = 4;
+ private Configuration conf;
+
+ /*
+ * Test to validate that FederationStateStoreRetriableException is a retriable
+ * exception.
+ */
+ @Test
+ public void testFacadeRetriableException() throws Exception {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+ RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+ RetryAction action = policy.shouldRetry(
+ new FederationStateStoreRetriableException(""), 0, 0, false);
+ // We compare only the action, since delay and the reason are random values
+ // during this test
+ Assert.assertEquals(RetryAction.RETRY.action, action.action);
+
+ // After maxRetries we stop to retry
+ action = policy.shouldRetry(new FederationStateStoreRetriableException(""),
+ maxRetries, 0, false);
+ Assert.assertEquals(RetryAction.FAIL.action, action.action);
+ }
+
+ /*
+ * Test to validate that YarnException is not a retriable exception.
+ */
+ @Test
+ public void testFacadeYarnException() throws Exception {
+
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+ RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+ RetryAction action = policy.shouldRetry(new YarnException(), 0, 0, false);
+ Assert.assertEquals(RetryAction.FAIL.action, action.action);
+ }
+
+ /*
+ * Test to validate that FederationStateStoreException is not a retriable
+ * exception.
+ */
+ @Test
+ public void testFacadeStateStoreException() throws Exception {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+ RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+ RetryAction action = policy.shouldRetry(
+ new FederationStateStoreException(
+ FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL),
+ 0, 0, false);
+ Assert.assertEquals(RetryAction.FAIL.action, action.action);
+ }
+
+ /*
+ * Test to validate that FederationStateStoreInvalidInputException is not a
+ * retriable exception.
+ */
+ @Test
+ public void testFacadeInvalidInputException() throws Exception {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+ RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+ RetryAction action = policy.shouldRetry(
+ new FederationStateStoreInvalidInputException(""), 0, 0, false);
+ Assert.assertEquals(RetryAction.FAIL.action, action.action);
+ }
+
+ /*
+ * Test to validate that CacheLoaderException is a retriable exception.
+ */
+ @Test
+ public void testFacadeCacheRetriableException() throws Exception {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+ RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+ RetryAction action =
+ policy.shouldRetry(new CacheLoaderException(""), 0, 0, false);
+ // We compare only the action, since delay and the reason are random values
+ // during this test
+ Assert.assertEquals(RetryAction.RETRY.action, action.action);
+
+ // After maxRetries we stop to retry
+ action =
+ policy.shouldRetry(new CacheLoaderException(""), maxRetries, 0, false);
+ Assert.assertEquals(RetryAction.FAIL.action, action.action);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org