You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/25 23:56:43 UTC

svn commit: r1605616 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-yarn-server/hado...

Author: vinodkv
Date: Wed Jun 25 21:56:42 2014
New Revision: 1605616

URL: http://svn.apache.org/r1605616
Log:
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when AMs heartbeat in. Contributed by Jason Lowe.

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1605616&r1=1605615&r2=1605616&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jun 25 21:56:42 2014
@@ -192,6 +192,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2152. Added missing information into ContainerTokenIdentifier so that
     NodeManagers can report the same to RM when RM restarts. (Jian He via vinodkv)
 
+    YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
+    AMs heartbeat in. (Jason Lowe via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1605616&r1=1605615&r2=1605616&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Jun 25 21:56:42 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -180,7 +182,7 @@ public class CapacityScheduler extends
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
-  private int numNodeManagers = 0;
+  private AtomicInteger numNodeManagers = new AtomicInteger(0);
 
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
@@ -236,8 +238,8 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public synchronized int getNumClusterNodes() {
-    return numNodeManagers;
+  public int getNumClusterNodes() {
+    return numNodeManagers.get();
   }
 
   @Override
@@ -953,11 +955,11 @@ public class CapacityScheduler extends
         usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
-    ++numNodeManagers;
+    int numNodes = numNodeManagers.incrementAndGet();
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
 
-    if (scheduleAsynchronously && numNodeManagers == 1) {
+    if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
   }
@@ -969,9 +971,9 @@ public class CapacityScheduler extends
     }
     Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
     root.updateClusterResource(clusterResource);
-    --numNodeManagers;
+    int numNodes = numNodeManagers.decrementAndGet();
 
-    if (scheduleAsynchronously && numNodeManagers == 0) {
+    if (scheduleAsynchronously && numNodes == 0) {
       asyncSchedulerThread.suspendSchedule();
     }
     

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1605616&r1=1605615&r2=1605616&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Jun 25 21:56:42 2014
@@ -25,15 +25,29 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,13 +60,20 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -686,4 +707,125 @@ public class TestCapacityScheduler {
     }
   }
 
+  @Test(timeout = 30000)
+  public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
+    final YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRMWithAMS rm =
+        new MockRMWithAMS(conf, containerManager);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, "*");
+    RMApp app = rm.submitApp(1024, "appname", "appuser", acls);
+
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+    int msecToWait = 10000;
+    int msecToSleep = 100;
+    while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
+        && msecToWait > 0) {
+      LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
+          + "Current state is " + attempt.getAppAttemptState());
+      Thread.sleep(msecToSleep);
+      msecToWait -= msecToSleep;
+    }
+    Assert.assertEquals(attempt.getAppAttemptState(),
+        RMAppAttemptState.LAUNCHED);
+
+    // Create a client to the RM.
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    UserGroupInformation currentUser =
+        UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
+    Credentials credentials = containerManager.getContainerCredentials();
+    final InetSocketAddress rmBindAddress =
+        rm.getApplicationMasterService().getBindAddress();
+    Token<? extends TokenIdentifier> amRMToken =
+        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
+          credentials.getAllTokens());
+    currentUser.addToken(amRMToken);
+    ApplicationMasterProtocol client =
+        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() {
+            return (ApplicationMasterProtocol) rpc.getProxy(
+              ApplicationMasterProtocol.class, rmBindAddress, conf);
+          }
+        });
+
+    RegisterApplicationMasterRequest request =
+        RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
+    client.registerApplicationMaster(request);
+
+    // grab the scheduler lock from another thread
+    // and verify an allocate call in this thread doesn't block on it
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+    Thread otherThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        synchronized(cs) {
+          try {
+            barrier.await();
+            barrier.await();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } catch (BrokenBarrierException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    });
+    otherThread.start();
+    barrier.await();
+    AllocateRequest allocateRequest =
+        AllocateRequest.newInstance(0, 0.0f, null, null, null);
+    client.allocate(allocateRequest);
+    barrier.await();
+    otherThread.join();
+
+    rm.stop();
+  }
+
+  @Test
+  public void testNumClusterNodes() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(conf);
+    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    cs.setRMContext(rmContext);
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    cs.init(csConf);
+    cs.start();
+    assertEquals(0, cs.getNumClusterNodes());
+
+    RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
+    RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    cs.handle(new NodeAddedSchedulerEvent(n2));
+    assertEquals(2, cs.getNumClusterNodes());
+
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(1, cs.getNumClusterNodes());
+    cs.handle(new NodeAddedSchedulerEvent(n1));
+    assertEquals(2, cs.getNumClusterNodes());
+    cs.handle(new NodeRemovedSchedulerEvent(n2));
+    cs.handle(new NodeRemovedSchedulerEvent(n1));
+    assertEquals(0, cs.getNumClusterNodes());
+
+    cs.stop();
+  }
 }