You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ka...@apache.org on 2014/02/12 15:11:38 UTC
svn commit: r1567629 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
hadoop-yarn/hadoop-yarn-server/ha...
Author: kasha
Date: Wed Feb 12 14:11:37 2014
New Revision: 1567629
URL: http://svn.apache.org/r1567629
Log:
YARN-1641. ZK store should attempt a write periodically to ensure it is still Active. (kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 12 14:11:37 2014
@@ -161,6 +161,9 @@ Release 2.4.0 - UNRELEASED
YARN-1706. Created an utility method to dump timeline records to JSON
strings. (zjshen)
+ YARN-1641. ZK store should attempt a write periodically to ensure it is
+ still Active. (kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Feb 12 14:11:37 2014
@@ -676,11 +676,11 @@ public abstract class RMStateStore exten
@SuppressWarnings("unchecked")
/**
- * In {#handleStoreEvent}, this method is called to notify the
- * ResourceManager that the store operation has failed.
+ * This method is called to notify the ResourceManager that the store
+ * operation has failed.
* @param failureCause the exception due to which the operation failed
*/
- private void notifyStoreOperationFailed(Exception failureCause) {
+ protected void notifyStoreOperationFailed(Exception failureCause) {
RMFatalEventType type;
if (failureCause instanceof StoreFencedException) {
type = RMFatalEventType.STATE_STORE_FENCED;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Feb 12 14:11:37 2014
@@ -137,6 +137,7 @@ public class ZKRMStateStore extends RMSt
private String fencingNodePath;
private Op createFencingNodePathOp;
private Op deleteFencingNodePathOp;
+ private Thread verifyActiveStatusThread;
private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
@@ -258,6 +259,8 @@ public class ZKRMStateStore extends RMSt
createRootDir(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
+ verifyActiveStatusThread = new VerifyActiveStatusThread();
+ verifyActiveStatusThread.start();
}
createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
@@ -350,6 +353,10 @@ public class ZKRMStateStore extends RMSt
@Override
protected synchronized void closeInternal() throws Exception {
+ if (verifyActiveStatusThread != null) {
+ verifyActiveStatusThread.interrupt();
+ verifyActiveStatusThread.join(1000);
+ }
closeZkClients();
}
@@ -856,6 +863,32 @@ public class ZKRMStateStore extends RMSt
}.runWithRetries();
}
+ /**
+ * Helper class that periodically attempts creating a znode to ensure that
+ * this RM continues to be the Active.
+ */
+ private class VerifyActiveStatusThread extends Thread {
+ private List<Op> emptyOpList = new ArrayList<Op>();
+
+ VerifyActiveStatusThread() {
+ super(VerifyActiveStatusThread.class.getName());
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ doMultiWithRetries(emptyOpList);
+ Thread.sleep(zkSessionTimeout);
+ }
+ } catch (InterruptedException ie) {
+ LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
+ "interrupted! Exiting!");
+ } catch (Exception e) {
+ notifyStoreOperationFailed(new StoreFencedException());
+ }
+ }
+ }
+
private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Wed Feb 12 14:11:37 2014
@@ -23,10 +23,7 @@ import static org.junit.Assert.assertTru
import static org.mockito.Mockito.mock;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,15 +31,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
@@ -54,6 +44,7 @@ import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+ private static final int ZK_TIMEOUT_MS = 1000;
class TestZKRMStateStoreTester implements RMStateStoreHelper {
@@ -141,6 +132,7 @@ public class TestZKRMStateStore extends
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
for (String id : HAUtil.getRMHAIds(conf)) {
@@ -182,26 +174,7 @@ public class TestZKRMStateStore extends
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
- // Submitting an application to RM1 to trigger a state store operation.
- // RM1 should realize that it got fenced and is not the Active RM anymore.
- Map mockMap = mock(Map.class);
- ApplicationSubmissionContext asc =
- ApplicationSubmissionContext.newInstance(
- ApplicationId.newInstance(1000, 1),
- "testApplication", // app Name
- "default", // queue name
- Priority.newInstance(0),
- ContainerLaunchContext.newInstance(mockMap, mockMap,
- new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
- mockMap),
- false, // unmanaged AM
- true, // cancelTokens
- 1, // max app attempts
- Resource.newInstance(1024, 1));
- ClientRMService rmService = rm1.getClientRMService();
- rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
-
- for (int i = 0; i < 30; i++) {
+ for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100);