You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/12/04 01:45:40 UTC
svn commit: r1547658 - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/clie...
Author: arp
Date: Wed Dec 4 00:45:38 2013
New Revision: 1547658
URL: http://svn.apache.org/r1547658
Log:
Merging r1547474 through r1547657 from trunk to branch HDFS-2832
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Wed Dec 4 00:45:38 2013
@@ -132,6 +132,9 @@ Release 2.4.0 - UNRELEASED
YARN-1318. Promoted AdminService to an Always-On service and merged it into
RMHAProtocolService. (Karthik Kambatla via vinodkv)
+ YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where
+ possible (Sebastian Wong via Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
@@ -191,6 +194,12 @@ Release 2.4.0 - UNRELEASED
YARN-1416. Fixed a few invalid transitions in RMApp, RMAppAttempt and in some
tests. (Jian He via vinodkv)
+ YARN-895. Changed RM state-store to not crash immediately if RM restarts while
+ the state-store is down. (Jian He via vinodkv)
+
+ YARN-1454. Fixed test failure issue with TestRMRestart. (Karthik Kambatla
+ via vinodkv)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Wed Dec 4 00:45:38 2013
@@ -305,4 +305,9 @@
<Bug pattern="NM_CLASS_NOT_EXCEPTION" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+
</FindBugsFilter>
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Dec 4 00:45:38 2013
@@ -301,22 +301,30 @@ public class YarnConfiguration extends C
public static final String RM_STORE = RM_PREFIX + "store.class";
/** URI for FileSystemRMStateStore */
- public static final String FS_RM_STATE_STORE_URI =
- RM_PREFIX + "fs.state-store.uri";
+ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX
+ + "fs.state-store.uri";
+ public static final String FS_RM_STATE_STORE_RETRY_POLICY_SPEC = RM_PREFIX
+ + "fs.state-store.retry-policy-spec";
+ public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
+ "2000, 500";
/**
* Comma separated host:port pairs, each corresponding to a ZK server for
* ZKRMStateStore
*/
public static final String ZK_STATE_STORE_PREFIX =
- RM_PREFIX + "zk.state-store.";
+ RM_PREFIX + "zk-state-store.";
public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
ZK_STATE_STORE_PREFIX + "num-retries";
- public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
+ public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500;
+ /** retry interval when connecting to zookeeper*/
+ public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS =
+ ZK_STATE_STORE_PREFIX + "retry-interval-ms";
+ public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000;
public static final String ZK_RM_STATE_STORE_ADDRESS =
ZK_STATE_STORE_PREFIX + "address";
/** Timeout in millisec for ZK server connection for ZKRMStateStore */
public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
- ZK_STATE_STORE_PREFIX + "timeout.ms";
+ ZK_STATE_STORE_PREFIX + "timeout-ms";
public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
/** Parent znode path under which ZKRMStateStore will create znodes */
public static final String ZK_RM_STATE_STORE_PARENT_PATH =
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Wed Dec 4 00:45:38 2013
@@ -248,7 +248,7 @@ public class TestAMRMClient {
matches = amClient.getMatchingRequests(priority, node, testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
- assertTrue(storedContainer1 == storedRequest);
+ assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
// exact matching with order maintained
@@ -259,9 +259,9 @@ public class TestAMRMClient {
int i = 0;
for(ContainerRequest storedRequest1 : matches.get(0)) {
if(i++ == 0) {
- assertTrue(storedContainer4 == storedRequest1);
+ assertEquals(storedContainer4, storedRequest1);
} else {
- assertTrue(storedContainer6 == storedRequest1);
+ assertEquals(storedContainer6, storedRequest1);
}
}
amClient.removeContainerRequest(storedContainer6);
@@ -276,7 +276,7 @@ public class TestAMRMClient {
assert(matches.size() == 2);
// verify non-fitting containers are not returned and fitting ones are
for(Collection<ContainerRequest> testSet : matches) {
- assertTrue(testSet.size() == 1);
+ assertEquals(1, testSet.size());
ContainerRequest testRequest = testSet.iterator().next();
assertTrue(testRequest != storedContainer4);
assertTrue(testRequest != storedContainer5);
@@ -310,8 +310,8 @@ public class TestAMRMClient {
private void verifyMatches(
List<? extends Collection<ContainerRequest>> matches,
int matchSize) {
- assertTrue(matches.size() == 1);
- assertTrue(matches.get(0).size() == matchSize);
+ assertEquals(1, matches.size());
+ assertEquals(matches.get(0).size(), matchSize);
}
@Test (timeout=60000)
@@ -337,12 +337,12 @@ public class TestAMRMClient {
matches = amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
- assertTrue(storedContainer1 == storedRequest);
+ assertEquals(storedContainer1, storedRequest);
// inferred match rack
matches = amClient.getMatchingRequests(priority, rack, capability);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
- assertTrue(storedContainer1 == storedRequest);
+ assertEquals(storedContainer1, storedRequest);
// inferred rack match no longer valid after request is removed
amClient.removeContainerRequest(storedContainer1);
@@ -387,10 +387,10 @@ public class TestAMRMClient {
// test addition and storage
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
- assertTrue(containersRequestedAny == 2);
+ assertEquals(2, containersRequestedAny);
containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
- assertTrue(containersRequestedAny == 1);
+ assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
amClient.getMatchingRequests(priority, node, capability);
verifyMatches(matches, 2);
@@ -417,7 +417,7 @@ public class TestAMRMClient {
// test matching of containers
ContainerRequest storedRequest = matches.get(0).iterator().next();
- assertTrue(storedContainer1 == storedRequest);
+ assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
matches =
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
@@ -438,10 +438,10 @@ public class TestAMRMClient {
&& iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- assertTrue(nodeCount == amClient.getClusterNodeCount());
+ assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerRequest expectedRequest =
@@ -453,7 +453,7 @@ public class TestAMRMClient {
// test correct matched container is returned
verifyMatches(matches, 1);
ContainerRequest matchedRequest = matches.get(0).iterator().next();
- assertTrue(matchedRequest == expectedRequest);
+ assertEquals(matchedRequest, expectedRequest);
amClient.removeContainerRequest(matchedRequest);
// assign this container, use it and release it
amClient.releaseAssignedContainer(container.getId());
@@ -464,11 +464,11 @@ public class TestAMRMClient {
}
}
- assertTrue(allocatedContainerCount == 2);
+ assertEquals(2, allocatedContainerCount);
AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertTrue(amClient.release.size() == 0);
- assertTrue(amClient.ask.size() == 0);
- assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+ assertEquals(0, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
// 0 requests left. everything got cleaned up
assertTrue(amClient.remoteRequestsTable.isEmpty());
@@ -494,14 +494,14 @@ public class TestAMRMClient {
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
- assertTrue(amClient.ask.size() == 3);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(3, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
List<String> localNodeBlacklist = new ArrayList<String>();
localNodeBlacklist.add(node);
@@ -512,7 +512,7 @@ public class TestAMRMClient {
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
// the only node is in blacklist, so no allocation
- assertTrue(allocatedContainerCount == 0);
+ assertEquals(0, allocatedContainerCount);
// Remove node from blacklist, so get assigned with 2
amClient.updateBlacklist(null, localNodeBlacklist);
@@ -521,7 +521,7 @@ public class TestAMRMClient {
amClient.addContainerRequest(storedContainer2);
allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
- assertEquals(allocatedContainerCount, 2);
+ assertEquals(2, allocatedContainerCount);
// Test in case exception in allocate(), blacklist is kept
assertTrue(amClient.blacklistAdditions.isEmpty());
@@ -538,7 +538,7 @@ public class TestAMRMClient {
amClient.allocate(0.1f);
fail("there should be an exception here.");
} catch (Exception e) {
- assertEquals(amClient.blacklistAdditions.size(), 1);
+ assertEquals(1, amClient.blacklistAdditions.size());
}
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@@ -565,16 +565,16 @@ public class TestAMRMClient {
nodeList01.add(nodes[0]);
nodeList01.add(nodes[1]);
amClient.updateBlacklist(nodeList01, null);
- assertEquals(amClient.blacklistAdditions.size(),2);
- assertEquals(amClient.blacklistRemovals.size(),0);
+ assertEquals(2, amClient.blacklistAdditions.size());
+ assertEquals(0, amClient.blacklistRemovals.size());
// Add nodes[0] again, verify it is not added duplicated.
List<String> nodeList02 = new ArrayList<String>();
nodeList02.add(nodes[0]);
nodeList02.add(nodes[2]);
amClient.updateBlacklist(nodeList02, null);
- assertEquals(amClient.blacklistAdditions.size(),3);
- assertEquals(amClient.blacklistRemovals.size(),0);
+ assertEquals(3, amClient.blacklistAdditions.size());
+ assertEquals(0, amClient.blacklistRemovals.size());
// Add nodes[1] and nodes[2] to removal list,
// Verify addition list remove these two nodes.
@@ -582,16 +582,16 @@ public class TestAMRMClient {
nodeList12.add(nodes[1]);
nodeList12.add(nodes[2]);
amClient.updateBlacklist(null, nodeList12);
- assertEquals(amClient.blacklistAdditions.size(),1);
- assertEquals(amClient.blacklistRemovals.size(),2);
+ assertEquals(1, amClient.blacklistAdditions.size());
+ assertEquals(2, amClient.blacklistRemovals.size());
// Add nodes[1] again to addition list,
// Verify removal list will remove this node.
List<String> nodeList1 = new ArrayList<String>();
nodeList1.add(nodes[1]);
amClient.updateBlacklist(nodeList1, null);
- assertEquals(amClient.blacklistAdditions.size(),2);
- assertEquals(amClient.blacklistRemovals.size(),1);
+ assertEquals(2, amClient.blacklistAdditions.size());
+ assertEquals(1, amClient.blacklistRemovals.size());
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
@@ -606,10 +606,10 @@ public class TestAMRMClient {
while (iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- assertTrue(nodeCount == amClient.getClusterNodeCount());
+ assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
if(allocatedContainerCount == 0) {
@@ -654,8 +654,8 @@ public class TestAMRMClient {
throws YarnException, IOException {
// setup container request
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
@@ -677,11 +677,11 @@ public class TestAMRMClient {
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
- assertTrue(containersRequestedNode == 2);
- assertTrue(containersRequestedRack == 2);
- assertTrue(containersRequestedAny == 2);
- assertTrue(amClient.ask.size() == 3);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(3, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
@@ -695,10 +695,10 @@ public class TestAMRMClient {
while (allocatedContainerCount < containersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- assertTrue(nodeCount == amClient.getClusterNodeCount());
+ assertEquals(nodeCount, amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
for(Container container : allocResponse.getAllocatedContainers()) {
ContainerId rejectContainerId = container.getId();
@@ -724,19 +724,19 @@ public class TestAMRMClient {
Assert.assertTrue(receivedNMTokens.size() > 0
&& receivedNMTokens.size() <= nodeCount);
- assertTrue(allocatedContainerCount == containersRequestedAny);
- assertTrue(amClient.release.size() == 2);
- assertTrue(amClient.ask.size() == 0);
+ assertEquals(allocatedContainerCount, containersRequestedAny);
+ assertEquals(2, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
// need to tell the AMRMClient that we dont need these resources anymore
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new ContainerRequest(capability, nodes, racks, priority));
- assertTrue(amClient.ask.size() == 3);
+ assertEquals(3, amClient.ask.size());
// send 0 container count request for resources that are no longer needed
ResourceRequest snoopRequest = amClient.ask.iterator().next();
- assertTrue(snoopRequest.getNumContainers() == 0);
+ assertEquals(0, snoopRequest.getNumContainers());
// test RPC exception handling
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
@@ -744,7 +744,7 @@ public class TestAMRMClient {
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
racks, priority));
snoopRequest = amClient.ask.iterator().next();
- assertTrue(snoopRequest.getNumContainers() == 2);
+ assertEquals(2, snoopRequest.getNumContainers());
ApplicationMasterProtocol realRM = amClient.rmClient;
try {
@@ -768,12 +768,12 @@ public class TestAMRMClient {
amClient.rmClient = realRM;
}
- assertTrue(amClient.release.size() == 2);
- assertTrue(amClient.ask.size() == 3);
+ assertEquals(2, amClient.release.size());
+ assertEquals(3, amClient.ask.size());
snoopRequest = amClient.ask.iterator().next();
// verify that the remove request made in between makeRequest and allocate
// has not been lost
- assertTrue(snoopRequest.getNumContainers() == 0);
+ assertEquals(0, snoopRequest.getNumContainers());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going send new containers
@@ -781,13 +781,13 @@ public class TestAMRMClient {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
- assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
if(allocResponse.getCompletedContainersStatuses().size() > 0) {
for(ContainerStatus cStatus :allocResponse
.getCompletedContainersStatuses()) {
if(releases.contains(cStatus.getContainerId())) {
- assertTrue(cStatus.getState() == ContainerState.COMPLETE);
- assertTrue(cStatus.getExitStatus() == -100);
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
@@ -797,8 +797,8 @@ public class TestAMRMClient {
sleep(100);
}
}
- assertTrue(amClient.ask.size() == 0);
- assertTrue(amClient.release.size() == 0);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
}
private void sleep(int sleepTime) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Dec 4 00:45:38 2013
@@ -283,8 +283,8 @@
is implicitly fenced, meaning a single ResourceManager is
able to use the store at any point in time. More details on this, along
with setting up appropriate ACLs is discussed under the description for
- yarn.resourcemanager.zk.state-store.root-node.acl.</description>
- <name>yarn.resourcemanager.zk.state-store.address</name>
+ yarn.resourcemanager.zk-state-store.root-node.acl.</description>
+ <name>yarn.resourcemanager.zk-state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
@@ -293,8 +293,15 @@
ZooKeeper. This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
- <name>yarn.resourcemanager.zk.state-store.num-retries</name>
- <value>3</value>
+ <name>yarn.resourcemanager.zk-state-store.num-retries</name>
+ <value>500</value>
+ </property>
+
+ <property>
+ <description>Retry interval in milliseconds when ZKRMStateStore tries to
+ connect to ZooKeeper.</description>
+ <name>yarn.resourcemanager.zk-state-store.retry-interval-ms</name>
+ <value>2000</value>
</property>
<property>
@@ -302,16 +309,20 @@
stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
- <name>yarn.resourcemanager.zk.state-store.parent-path</name>
+ <name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>
<property>
- <description>Timeout when connecting to ZooKeeper.
+ <description>ZooKeeper session timeout in milliseconds. Session expiration
+ is managed by the ZooKeeper cluster itself, not by the client. This value is
+ used by the cluster to determine when the client's session expires.
+ Expirations happens when the cluster does not hear from the client within
+ the specified session timeout period (i.e. no heartbeat).
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
- <name>yarn.resourcemanager.zk.state-store.timeout.ms</name>
+ <name>yarn.resourcemanager.zk-state-store.timeout-ms</name>
<value>60000</value>
</property>
@@ -320,7 +331,7 @@
This may be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
- <name>yarn.resourcemanager.zk.state-store.acl</name>
+ <name>yarn.resourcemanager.zk-state-store.acl</name>
<value>world:anyone:rwcda</value>
</property>
@@ -336,7 +347,7 @@
permissions.
By default, when this property is not set, we use the ACLs from
- yarn.resourcemanager.zk.state-store.acl for shared admin access and
+ yarn.resourcemanager.zk-state-store.acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete
access.
@@ -346,7 +357,7 @@
ResourceManagers have shared admin access and the Active ResourceManger
takes over (exclusively) the create-delete access.
</description>
- <name>yarn.resourcemanager.zk.state-store.root-node.acl</name>
+ <name>yarn.resourcemanager.zk-state-store.root-node.acl</name>
</property>
<property>
@@ -360,6 +371,16 @@
</property>
<property>
+ <description>hdfs client retry policy specification. hdfs client retry
+ is always enabled. Specified in pairs of sleep-time and number-of-retries
+ and (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on
+ average, the following n1 retries sleep t1 milliseconds on average, and so on.
+ </description>
+ <name>yarn.resourcemanager.fs.state-store.retry-policy-spec</name>
+ <value>2000, 500</value>
+ </property>
+
+ <property>
<description>Enable RM high-availability. When enabled,
(1) The RM starts in the Standby mode by default, and transitions to
the Active mode when prompted to.
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Wed Dec 4 00:45:38 2013
@@ -94,7 +94,14 @@ public class FileSystemRMStateStore exte
// create filesystem only now, as part of service-start. By this time, RM is
// authenticated with kerberos so we are good to create a file-system
// handle.
- fs = fsWorkingPath.getFileSystem(getConfig());
+ Configuration conf = new Configuration(getConfig());
+ conf.setBoolean("dfs.client.retry.policy.enabled", true);
+ String retryPolicy =
+ conf.get(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC);
+ conf.set("dfs.client.retry.policy.spec", retryPolicy);
+
+ fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Dec 4 00:45:38 2013
@@ -82,6 +82,7 @@ public class ZKRMStateStore extends RMSt
private String zkHostPort = null;
private int zkSessionTimeout;
+ private long zkRetryInterval;
private List<ACL> zkAcl;
private String zkRootNodePath;
private String rmDTSecretManagerRoot;
@@ -161,6 +162,9 @@ public class ZKRMStateStore extends RMSt
zkSessionTimeout =
conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
+ zkRetryInterval =
+ conf.getLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS);
// Parse authentication from configuration.
String zkAclConf =
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
@@ -810,6 +814,9 @@ public class ZKRMStateStore extends RMSt
}
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
+ LOG.info("Waiting for zookeeper to be connected, retry no. + "
+ + retry);
+ Thread.sleep(zkRetryInterval);
continue;
}
throw ke;
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Dec 4 00:45:38 2013
@@ -1247,6 +1247,8 @@ public class TestRMRestart {
// renewDate before renewing
Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
try{
+ // Sleep for one millisecond to make sure renewDataAfterRenew is greater
+ Thread.sleep(1);
// renew recovered token
rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
} catch(Exception e) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Wed Dec 4 00:45:38 2013
@@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@@ -33,7 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -81,6 +86,8 @@ public class TestFSRMStateStore extends
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
+ conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
+ "100,6000");
this.store = new TestFileSystemRMStore(conf);
return store;
}
@@ -139,4 +146,46 @@ public class TestFSRMStateStore extends
cluster.shutdown();
}
}
+
+ @Test (timeout = 30000)
+ public void testFSRMStateStoreClientRetry() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+ try {
+ TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
+ final RMStateStore store = fsTester.getRMStateStore();
+ store.setRMDispatcher(new TestDispatcher());
+ final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+ cluster.shutdownNameNodes();
+
+ Thread clientThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ store.storeApplicationStateInternal("application1",
+ (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+ .newApplicationStateData(111, 111, "user", null,
+ RMAppState.ACCEPTED, "diagnostics", 333));
+ } catch (Exception e) {
+ // TODO 0 datanode exception will not be retried by dfs client, fix
+ // that separately.
+ if (!e.getMessage().contains("could only be replicated" +
+ " to 0 nodes instead of minReplication (=1)")) {
+ assertionFailedInThread.set(true);
+ }
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread.sleep(2000);
+ clientThread.start();
+ cluster.restartNameNode();
+ clientThread.join();
+ Assert.assertFalse(assertionFailedInThread.get());
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1547658&r1=1547657&r2=1547658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Wed Dec 4 00:45:38 2013
@@ -37,6 +37,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -114,6 +115,37 @@ public class TestZKRMStateStoreZKClientC
}
}
+ @Test (timeout = 20000)
+ public void testZKClientRetry() throws Exception {
+ TestZKClient zkClientTester = new TestZKClient();
+ final String path = "/test";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+ conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
+ final ZKRMStateStore store =
+ (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+ final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+ stopServer();
+ Thread clientThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ store.getDataWithRetries(path, true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertionFailedInThread.set(true);
+ }
+ }
+ };
+ Thread.sleep(2000);
+ startServer();
+ clientThread.join();
+ Assert.assertFalse(assertionFailedInThread.get());
+ }
+
@Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect()
throws Exception {