You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/08/31 01:03:44 UTC
svn commit: r1519112 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/
hado...
Author: bikas
Date: Fri Aug 30 23:03:44 2013
New Revision: 1519112
URL: http://svn.apache.org/r1519112
Log:
Merge 1519107. from trunk to branch-2 for YARN-771. AMRMClient support for resource blacklisting (Junping Du via bikas)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Aug 30 23:03:44 2013
@@ -54,6 +54,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
vinodkv)
+ YARN-771. AMRMClient support for resource blacklisting (Junping Du via
+ bikas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Fri Aug 30 23:03:44 2013
@@ -286,4 +286,15 @@ public abstract class AMRMClient<T exten
Priority priority,
String resourceName,
Resource capability);
+
+ /**
+ * Update application's blacklist with addition or removal resources.
+ *
+ * @param blacklistAdditions list of resources which should be added to the
+ * application blacklist
+ * @param blacklistRemovals list of resources which should be removed from the
+ * application blacklist
+ */
+ public abstract void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java Fri Aug 30 23:03:44 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -80,6 +81,9 @@ public class AMRMClientImpl<T extends Co
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ protected final Set<String> blacklistAdditions = new HashSet<String>();
+ protected final Set<String> blacklistRemovals = new HashSet<String>();
+
class ResourceRequestInfo {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
@@ -199,9 +203,11 @@ public class AMRMClientImpl<T extends Co
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
- ArrayList<ResourceRequest> askList = null;
- ArrayList<ContainerId> releaseList = null;
+ List<ResourceRequest> askList = null;
+ List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
+ List<String> blacklistToAdd = new ArrayList<String>();
+ List<String> blacklistToRemove = new ArrayList<String>();
try {
synchronized (this) {
@@ -217,9 +223,22 @@ public class AMRMClientImpl<T extends Co
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
+
+ blacklistToAdd.addAll(blacklistAdditions);
+ blacklistToRemove.addAll(blacklistRemovals);
+
+ ResourceBlacklistRequest blacklistRequest =
+ (blacklistToAdd != null) || (blacklistToRemove != null) ?
+ ResourceBlacklistRequest.newInstance(blacklistToAdd,
+ blacklistToRemove) : null;
+
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
- askList, releaseList, null);
+ askList, releaseList, blacklistRequest);
+ // clear blacklistAdditions and blacklistRemovals before
+ // unsynchronized part
+ blacklistAdditions.clear();
+ blacklistRemovals.clear();
}
allocateResponse = rmClient.allocate(allocateRequest);
@@ -253,6 +272,9 @@ public class AMRMClientImpl<T extends Co
ask.add(oldAsk);
}
}
+
+ blacklistAdditions.addAll(blacklistToAdd);
+ blacklistRemovals.addAll(blacklistToRemove);
}
}
}
@@ -604,4 +626,31 @@ public class AMRMClientImpl<T extends Co
+ " #asks=" + ask.size());
}
}
+
+ @Override
+ public synchronized void updateBlacklist(List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+
+ if (blacklistAdditions != null) {
+ this.blacklistAdditions.addAll(blacklistAdditions);
+ // if some resources are also in blacklistRemovals updated before, we
+ // should remove them here.
+ this.blacklistRemovals.removeAll(blacklistAdditions);
+ }
+
+ if (blacklistRemovals != null) {
+ this.blacklistRemovals.addAll(blacklistRemovals);
+ // if some resources are in blacklistAdditions before, we should remove
+ // them here.
+ this.blacklistAdditions.removeAll(blacklistRemovals);
+ }
+
+ if (blacklistAdditions != null && blacklistRemovals != null
+ && blacklistAdditions.removeAll(blacklistRemovals)) {
+ // we allow resources to appear in addition list and removal list in the
+ // same invocation of updateBlacklist(), but should get a warn here.
+ LOG.warn("The same resources appear in both blacklistAdditions and " +
+ "blacklistRemovals in updateBlacklist.");
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java Fri Aug 30 23:03:44 2013
@@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.client.api.impl;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -97,6 +100,7 @@ public class TestAMRMClient {
static String rack;
static String[] nodes;
static String[] racks;
+ private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
@@ -476,6 +480,144 @@ public class TestAMRMClient {
}
}
}
+
+ @Test (timeout=60000)
+ public void testAllocationWithBlacklist() throws YarnException, IOException {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
+ .<ContainerRequest> createAMRMClient();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ ContainerRequest storedContainer1 =
+ new ContainerRequest(capability, nodes, racks, priority);
+ amClient.addContainerRequest(storedContainer1);
+ assertTrue(amClient.ask.size() == 3);
+ assertTrue(amClient.release.size() == 0);
+
+ List<String> localNodeBlacklist = new ArrayList<String>();
+ localNodeBlacklist.add(node);
+
+ // put node in black list, so no container assignment
+ amClient.updateBlacklist(localNodeBlacklist, null);
+
+ int allocatedContainerCount = getAllocatedContainersNumber(amClient,
+ DEFAULT_ITERATION);
+ // the only node is in blacklist, so no allocation
+ assertTrue(allocatedContainerCount == 0);
+
+ // Remove node from blacklist, so get assigned with 2
+ amClient.updateBlacklist(null, localNodeBlacklist);
+ ContainerRequest storedContainer2 =
+ new ContainerRequest(capability, nodes, racks, priority);
+ amClient.addContainerRequest(storedContainer2);
+ allocatedContainerCount = getAllocatedContainersNumber(amClient,
+ DEFAULT_ITERATION);
+ assertEquals(allocatedContainerCount, 2);
+
+ // Test in case exception in allocate(), blacklist is kept
+ assertTrue(amClient.blacklistAdditions.isEmpty());
+ assertTrue(amClient.blacklistRemovals.isEmpty());
+
+ // create a invalid ContainerRequest - memory value is minus
+ ContainerRequest invalidContainerRequest =
+ new ContainerRequest(Resource.newInstance(-1024, 1),
+ nodes, racks, priority);
+ amClient.addContainerRequest(invalidContainerRequest);
+ amClient.updateBlacklist(localNodeBlacklist, null);
+ try {
+ // allocate() should complain as ContainerRequest is invalid.
+ amClient.allocate(0.1f);
+ fail("there should be an exception here.");
+ } catch (Exception e) {
+ assertEquals(amClient.blacklistAdditions.size(), 1);
+ }
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testAMRMClientWithBlacklist() throws YarnException, IOException {
+ AMRMClientImpl<ContainerRequest> amClient = null;
+ try {
+ // start am rm client
+ amClient =
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
+ .<ContainerRequest> createAMRMClient();
+ amClient.init(conf);
+ amClient.start();
+ amClient.registerApplicationMaster("Host", 10000, "");
+ String[] nodes = {"node1", "node2", "node3"};
+
+ // Add nodes[0] and nodes[1]
+ List<String> nodeList01 = new ArrayList<String>();
+ nodeList01.add(nodes[0]);
+ nodeList01.add(nodes[1]);
+ amClient.updateBlacklist(nodeList01, null);
+ assertEquals(amClient.blacklistAdditions.size(),2);
+ assertEquals(amClient.blacklistRemovals.size(),0);
+
+ // Add nodes[0] again, verify it is not added duplicated.
+ List<String> nodeList02 = new ArrayList<String>();
+ nodeList02.add(nodes[0]);
+ nodeList02.add(nodes[2]);
+ amClient.updateBlacklist(nodeList02, null);
+ assertEquals(amClient.blacklistAdditions.size(),3);
+ assertEquals(amClient.blacklistRemovals.size(),0);
+
+ // Add nodes[1] and nodes[2] to removal list,
+ // Verify addition list remove these two nodes.
+ List<String> nodeList12 = new ArrayList<String>();
+ nodeList12.add(nodes[1]);
+ nodeList12.add(nodes[2]);
+ amClient.updateBlacklist(null, nodeList12);
+ assertEquals(amClient.blacklistAdditions.size(),1);
+ assertEquals(amClient.blacklistRemovals.size(),2);
+
+ // Add nodes[1] again to addition list,
+ // Verify removal list will remove this node.
+ List<String> nodeList1 = new ArrayList<String>();
+ nodeList1.add(nodes[1]);
+ amClient.updateBlacklist(nodeList1, null);
+ assertEquals(amClient.blacklistAdditions.size(),2);
+ assertEquals(amClient.blacklistRemovals.size(),1);
+ } finally {
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private int getAllocatedContainersNumber(
+ AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
+ throws YarnException, IOException {
+ int allocatedContainerCount = 0;
+ while (iterationsLeft-- > 0) {
+ Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertTrue(amClient.ask.size() == 0);
+ assertTrue(amClient.release.size() == 0);
+
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+
+ if(allocatedContainerCount == 0) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+ return allocatedContainerCount;
+ }
@Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java?rev=1519112&r1=1519111&r2=1519112&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceBlacklistRequestPBImpl.java Fri Aug 30 23:03:44 2013
@@ -125,7 +125,7 @@ public class ResourceBlacklistRequestPBI
@Override
public void setBlacklistAdditions(List<String> resourceNames) {
- if (resourceNames == null) {
+ if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistAdditions != null) {
this.blacklistAdditions.clear();
}
@@ -144,7 +144,7 @@ public class ResourceBlacklistRequestPBI
@Override
public void setBlacklistRemovals(List<String> resourceNames) {
- if (resourceNames == null) {
+ if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistRemovals != null) {
this.blacklistRemovals.clear();
}