You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/20 06:06:52 UTC

[1/9] hadoop git commit: YARN-5577. [Atsv2] Document object passing in infofilters with an example (Rohith Sharma K S via Varun Saxena)

Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-12756 08b37603d -> a49b3be38


YARN-5577. [Atsv2] Document object passing in infofilters with an example (Rohith Sharma K S via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea29e3bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea29e3bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea29e3bc

Branch: refs/heads/HADOOP-12756
Commit: ea29e3bc27f15516f4346d1312eef703bcd3d032
Parents: 3552c2b
Author: Varun Saxena <va...@apache.org>
Authored: Mon Sep 19 14:33:06 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Mon Sep 19 14:33:06 2016 +0530

----------------------------------------------------------------------
 .../hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea29e3bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index b6a0da4..6b7bd08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -712,6 +712,8 @@ none of the apps match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched applications must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.
@@ -837,6 +839,8 @@ match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched applications must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.
@@ -1035,6 +1039,8 @@ If none of the entities match the predicates, an empty list will be returned.
   "eq" means equals, "ne" means not equals and existence of key is not required for a match and "ene" means not equals but existence of key is
   required. We can combine any number of ANDs' and ORs' to create complex expressions.  Brackets can be used to club expressions together.<br/>
   _For example_ : infofilters can be "(((infokey1 eq value1) AND (infokey2 ne value1)) OR (infokey1 ene value3))".<br/>
+  Note : If value is an object then value can be given in the form of JSON format without any space.<br/>
+  _For example_ : infofilters can be (infokey1 eq {"&lt;key&gt;":"&lt;value&gt;","&lt;key&gt;":"&lt;value&gt;"...}).<br/>
   Please note that URL unsafe characters such as spaces will have to be suitably encoded.
 1. `conffilters` - If specified, matched entities must have exact matches to the given config name and must be either equal or not equal
   to the given config value. Both the config name and value must be strings. conffilters are represented in the same form as infofilters.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/9] hadoop git commit: YARN-5540. Scheduler spends too much time looking at empty priorities. Contributed by Jason Lowe

Posted by dr...@apache.org.
YARN-5540. Scheduler spends too much time looking at empty priorities. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7558dbbb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7558dbbb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7558dbbb

Branch: refs/heads/HADOOP-12756
Commit: 7558dbbb481eab055e794beb3603bbe5671a4b4c
Parents: c54f6ef
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Sep 19 20:31:35 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Sep 19 20:31:35 2016 +0000

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            | 96 +++++++++++---------
 .../scheduler/TestAppSchedulingInfo.java        | 65 +++++++++++++
 2 files changed, 118 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7558dbbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index c677345..39820f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class AppSchedulingInfo {
   
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
-  private static final int EPOCH_BIT_SHIFT = 40;
 
   private final ApplicationId applicationId;
   private final ApplicationAttemptId applicationAttemptId;
@@ -79,7 +78,8 @@ public class AppSchedulingInfo {
 
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>();
+  private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
+      schedulerKeys = new ConcurrentSkipListMap<>();
   final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
       resourceRequestMap = new ConcurrentHashMap<>();
   final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
@@ -236,6 +236,7 @@ public class AppSchedulingInfo {
     if (null == requestsOnNodeWithPriority) {
       requestsOnNodeWithPriority = new TreeMap<>();
       requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
+      incrementSchedulerKeyReference(schedulerKey);
     }
 
     requestsOnNodeWithPriority.put(containerId, request);
@@ -250,11 +251,30 @@ public class AppSchedulingInfo {
       LOG.debug("Added increase request:" + request.getContainerId()
           + " delta=" + delta);
     }
-    
-    // update Scheduler Keys
-    schedulerKeys.add(schedulerKey);
   }
-  
+
+  private void incrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount == null) {
+      schedulerKeys.put(schedulerKey, 1);
+    } else {
+      schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
+    }
+  }
+
+  private void decrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount != null) {
+      if (schedulerKeyCount > 1) {
+        schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
+      } else {
+        schedulerKeys.remove(schedulerKey);
+      }
+    }
+  }
+
   public synchronized boolean removeIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
     Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
@@ -275,6 +295,7 @@ public class AppSchedulingInfo {
     // remove hierarchies if it becomes empty
     if (requestsOnNodeWithPriority.isEmpty()) {
       requestsOnNode.remove(schedulerKey);
+      decrementSchedulerKeyReference(schedulerKey);
     }
     if (requestsOnNode.isEmpty()) {
       containerIncreaseRequestMap.remove(nodeId);
@@ -341,7 +362,6 @@ public class AppSchedulingInfo {
       if (asks == null) {
         asks = new ConcurrentHashMap<>();
         this.resourceRequestMap.put(schedulerKey, asks);
-        this.schedulerKeys.add(schedulerKey);
       }
 
       // Increment number of containers if recovering preempted resources
@@ -360,29 +380,34 @@ public class AppSchedulingInfo {
 
         anyResourcesUpdated = true;
 
-        // Activate application. Metrics activation is done here.
-        // TODO: Shouldn't we activate even if numContainers = 0?
-        if (request.getNumContainers() > 0) {
-          activeUsersManager.activateApplication(user, applicationId);
-        }
-
         // Update pendingResources
-        updatePendingResources(lastRequest, request, queue.getMetrics());
+        updatePendingResources(lastRequest, request, schedulerKey,
+            queue.getMetrics());
       }
     }
     return anyResourcesUpdated;
   }
 
   private void updatePendingResources(ResourceRequest lastRequest,
-      ResourceRequest request, QueueMetrics metrics) {
+      ResourceRequest request, SchedulerRequestKey schedulerKey,
+      QueueMetrics metrics) {
+    int lastRequestContainers =
+        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     if (request.getNumContainers() <= 0) {
+      if (lastRequestContainers >= 0) {
+        decrementSchedulerKeyReference(schedulerKey);
+      }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
       checkForDeactivation();
+    } else {
+      // Activate application. Metrics activation is done here.
+      if (lastRequestContainers <= 0) {
+        incrementSchedulerKeyReference(schedulerKey);
+        activeUsersManager.activateApplication(user, applicationId);
+      }
     }
 
-    int lastRequestContainers =
-        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     Resource lastRequestCapability =
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
     metrics.incrPendingResources(user,
@@ -505,7 +530,7 @@ public class AppSchedulingInfo {
   }
 
   public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
-    return schedulerKeys;
+    return schedulerKeys.keySet();
   }
 
   public synchronized Map<String, ResourceRequest> getResourceRequests(
@@ -617,7 +642,7 @@ public class AppSchedulingInfo {
     } else if (type == NodeType.RACK_LOCAL) {
       allocateRackLocal(node, schedulerKey, request, resourceRequests);
     } else {
-      allocateOffSwitch(request, resourceRequests);
+      allocateOffSwitch(request, resourceRequests, schedulerKey);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -656,7 +681,7 @@ public class AppSchedulingInfo {
 
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@@ -684,7 +709,7 @@ public class AppSchedulingInfo {
     
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@@ -696,15 +721,16 @@ public class AppSchedulingInfo {
    * application.
    */
   private synchronized void allocateOffSwitch(
-      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
+      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
+      SchedulerRequestKey schedulerKey) {
     // Update future requirements
-    decrementOutstanding(offSwitchRequest);
+    decrementOutstanding(offSwitchRequest, schedulerKey);
     // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   private synchronized void decrementOutstanding(
-      ResourceRequest offSwitchRequest) {
+      ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
@@ -713,6 +739,7 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
+      decrementSchedulerKeyReference(schedulerKey);
       checkForDeactivation();
     }
     
@@ -723,24 +750,7 @@ public class AppSchedulingInfo {
   }
   
   private synchronized void checkForDeactivation() {
-    boolean deactivate = true;
-    for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-      ResourceRequest request =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      if (request != null) {
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-    }
-    
-    // also we need to check increase request
-    if (!deactivate) {
-      deactivate = containerIncreaseRequestMap.isEmpty();
-    }
-
-    if (deactivate) {
+    if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7558dbbb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index 503ea34..7f9c719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -23,11 +23,14 @@ import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.junit.Assert;
@@ -105,4 +108,66 @@ public class TestAppSchedulingInfo {
     Assert.assertEquals(2, sk.getPriority().getPriority());
     Assert.assertEquals(6, sk.getAllocationRequestId());
   }
+
+  @Test
+  public void testSchedulerKeyAccounting() {
+    ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appIdImpl, 1);
+
+    Queue queue = mock(Queue.class);
+    doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+    AppSchedulingInfo  info = new AppSchedulingInfo(
+        appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
+        new ResourceUsage());
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    Priority pri1 = Priority.newInstance(1);
+    ResourceRequest req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    Priority pri2 = Priority.newInstance(2);
+    ResourceRequest req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 2);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(req1);
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    ArrayList<SchedulerRequestKey> keys =
+        new ArrayList<>(info.getSchedulerKeys());
+    Assert.assertEquals(2, keys.size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1), keys.get(0));
+    Assert.assertEquals(SchedulerRequestKey.create(req2), keys.get(1));
+
+    // iterate to verify no ConcurrentModificationException
+    for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) {
+      info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null);
+    }
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req2),
+        info.getSchedulerKeys().iterator().next());
+
+    req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    reqs.clear();
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2),
+        req2, null);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 5);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1),
+        info.getSchedulerKeys().iterator().next());
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 0);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/9] hadoop git commit: YARN-3141. Improve locks in SchedulerApplicationAttempt/FSAppAttempt/FiCaSchedulerApp. Contributed by Wangda Tan

Posted by dr...@apache.org.
YARN-3141. Improve locks in SchedulerApplicationAttempt/FSAppAttempt/FiCaSchedulerApp. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8a30f2f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8a30f2f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8a30f2f

Branch: refs/heads/HADOOP-12756
Commit: b8a30f2f170ffbd590e7366c3c944ab4919e40df
Parents: ea29e3b
Author: Jian He <ji...@apache.org>
Authored: Mon Sep 19 16:58:39 2016 +0800
Committer: Jian He <ji...@apache.org>
Committed: Mon Sep 19 17:08:01 2016 +0800

----------------------------------------------------------------------
 .../scheduler/SchedulerApplicationAttempt.java  | 744 +++++++++++--------
 .../allocator/RegularContainerAllocator.java    |   2 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 418 ++++++-----
 .../scheduler/fair/FSAppAttempt.java            | 465 ++++++------
 4 files changed, 922 insertions(+), 707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a30f2f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 97d29cf..adc3a97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -26,8 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.collect.ConcurrentHashMultiset;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.commons.logging.Log;
@@ -71,8 +74,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
 
 /**
  * Represents an application attempt from the viewpoint of the scheduler.
@@ -97,14 +98,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   protected final AppSchedulingInfo appSchedulingInfo;
   protected ApplicationAttemptId attemptId;
   protected Map<ContainerId, RMContainer> liveContainers =
-      new HashMap<ContainerId, RMContainer>();
+      new ConcurrentHashMap<>();
   protected final Map<SchedulerRequestKey, Map<NodeId, RMContainer>>
       reservedContainers = new HashMap<>();
 
-  private final Multiset<SchedulerRequestKey> reReservations =
-      HashMultiset.create();
+  private final ConcurrentHashMultiset<SchedulerRequestKey> reReservations =
+      ConcurrentHashMultiset.create();
   
-  private Resource resourceLimit = Resource.newInstance(0, 0);
+  private volatile Resource resourceLimit = Resource.newInstance(0, 0);
   private boolean unmanagedAM = true;
   private boolean amRunning = false;
   private LogAggregationContext logAggregationContext;
@@ -138,8 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * the application successfully schedules a task (at rack or node local), it
    * is reset to 0.
    */
-  Multiset<SchedulerRequestKey> schedulingOpportunities = HashMultiset.create();
-  
+  private ConcurrentHashMultiset<SchedulerRequestKey> schedulingOpportunities =
+      ConcurrentHashMultiset.create();
+
   /**
    * Count how many times the application has been given an opportunity to
    * schedule a non-partitioned resource request at each priority. Each time the
@@ -147,15 +149,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * incremented, and each time the application successfully schedules a task,
    * it is reset to 0 when schedule any task at corresponding priority.
    */
-  Multiset<SchedulerRequestKey> missedNonPartitionedReqSchedulingOpportunity =
-      HashMultiset.create();
+  private ConcurrentHashMultiset<SchedulerRequestKey>
+      missedNonPartitionedReqSchedulingOpportunity =
+      ConcurrentHashMultiset.create();
   
   // Time of the last container scheduled at the current allowed level
   protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
 
-  protected Queue queue;
-  protected boolean isStopped = false;
+  protected volatile Queue queue;
+  protected volatile boolean isStopped = false;
 
   protected String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
 
@@ -163,6 +166,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
 
   private RMAppAttempt appAttempt;
 
+  protected ReentrantReadWriteLock.ReadLock readLock;
+  protected ReentrantReadWriteLock.WriteLock writeLock;
+
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
@@ -188,14 +194,23 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
             appSubmissionContext.getLogAggregationContext();
       }
     }
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
   }
   
   /**
    * Get the live containers of the application.
    * @return live containers of the application
    */
-  public synchronized Collection<RMContainer> getLiveContainers() {
-    return new ArrayList<RMContainer>(liveContainers.values());
+  public Collection<RMContainer> getLiveContainers() {
+    try {
+      readLock.lock();
+      return new ArrayList<>(liveContainers.values());
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public AppSchedulingInfo getAppSchedulingInfo() {
@@ -243,20 +258,36 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return appSchedulingInfo.getSchedulerKeys();
   }
   
-  public synchronized ResourceRequest getResourceRequest(
+  public ResourceRequest getResourceRequest(
       SchedulerRequestKey schedulerKey, String resourceName) {
-    return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName);
+    try {
+      readLock.lock();
+      return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName);
+    } finally {
+      readLock.unlock();
+    }
+
   }
 
-  public synchronized int getTotalRequiredResources(
+  public int getTotalRequiredResources(
       SchedulerRequestKey schedulerKey) {
-    ResourceRequest request =
-        getResourceRequest(schedulerKey, ResourceRequest.ANY);
-    return request == null ? 0 : request.getNumContainers();
+    try {
+      readLock.lock();
+      ResourceRequest request =
+          getResourceRequest(schedulerKey, ResourceRequest.ANY);
+      return request == null ? 0 : request.getNumContainers();
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public synchronized Resource getResource(SchedulerRequestKey schedulerKey) {
-    return appSchedulingInfo.getResource(schedulerKey);
+  public Resource getResource(SchedulerRequestKey schedulerKey) {
+    try {
+      readLock.lock();
+      return appSchedulingInfo.getResource(schedulerKey);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public String getQueueName() {
@@ -291,38 +322,48 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return unmanagedAM;
   }
 
-  public synchronized RMContainer getRMContainer(ContainerId id) {
+  public RMContainer getRMContainer(ContainerId id) {
     return liveContainers.get(id);
   }
 
-  public synchronized void addRMContainer(
+  public void addRMContainer(
       ContainerId id, RMContainer rmContainer) {
-    liveContainers.put(id, rmContainer);
-    if (rmContainer.isRemotelyAllocated()) {
-      this.attemptResourceUsageAllocatedRemotely.incUsed(
-          rmContainer.getAllocatedResource());
+    try {
+      writeLock.lock();
+      liveContainers.put(id, rmContainer);
+      if (rmContainer.isRemotelyAllocated()) {
+        this.attemptResourceUsageAllocatedRemotely.incUsed(
+            rmContainer.getAllocatedResource());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  public synchronized void removeRMContainer(ContainerId containerId) {
-    RMContainer rmContainer = liveContainers.remove(containerId);
-    if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
-      this.attemptResourceUsageAllocatedRemotely.decUsed(
-          rmContainer.getAllocatedResource());
+  public void removeRMContainer(ContainerId containerId) {
+    try {
+      writeLock.lock();
+      RMContainer rmContainer = liveContainers.remove(containerId);
+      if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
+        this.attemptResourceUsageAllocatedRemotely.decUsed(
+            rmContainer.getAllocatedResource());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  protected synchronized void resetReReservations(
+  protected void resetReReservations(
       SchedulerRequestKey schedulerKey) {
     reReservations.setCount(schedulerKey, 0);
   }
 
-  protected synchronized void addReReservation(
+  protected void addReReservation(
       SchedulerRequestKey schedulerKey) {
     reReservations.add(schedulerKey);
   }
 
-  public synchronized int getReReservations(SchedulerRequestKey schedulerKey) {
+  public int getReReservations(SchedulerRequestKey schedulerKey) {
     return reReservations.count(schedulerKey);
   }
 
@@ -333,7 +374,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    */
   @Stable
   @Private
-  public synchronized Resource getCurrentReservation() {
+  public Resource getCurrentReservation() {
     return attemptResourceUsage.getReserved();
   }
   
@@ -341,28 +382,43 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return queue;
   }
   
-  public synchronized boolean updateResourceRequests(
+  public boolean updateResourceRequests(
       List<ResourceRequest> requests) {
-    if (!isStopped) {
-      return appSchedulingInfo.updateResourceRequests(requests, false);
+    try {
+      writeLock.lock();
+      if (!isStopped) {
+        return appSchedulingInfo.updateResourceRequests(requests, false);
+      }
+      return false;
+    } finally {
+      writeLock.unlock();
     }
-    return false;
   }
   
-  public synchronized void recoverResourceRequestsForContainer(
+  public void recoverResourceRequestsForContainer(
       List<ResourceRequest> requests) {
-    if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests, true);
+    try {
+      writeLock.lock();
+      if (!isStopped) {
+        appSchedulingInfo.updateResourceRequests(requests, true);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
-  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
-    // Cleanup all scheduling information
-    isStopped = true;
-    appSchedulingInfo.stop();
+  public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+    try {
+      writeLock.lock();
+      // Cleanup all scheduling information
+      isStopped = true;
+      appSchedulingInfo.stop();
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  public synchronized boolean isStopped() {
+  public boolean isStopped() {
     return isStopped;
   }
 
@@ -370,29 +426,40 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * Get the list of reserved containers
    * @return All of the reserved containers.
    */
-  public synchronized List<RMContainer> getReservedContainers() {
-    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
-    for (Map.Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
-      this.reservedContainers.entrySet()) {
-      reservedContainers.addAll(e.getValue().values());
+  public List<RMContainer> getReservedContainers() {
+    List<RMContainer> list = new ArrayList<>();
+    try {
+      readLock.lock();
+      for (Entry<SchedulerRequestKey, Map<NodeId, RMContainer>> e :
+          this.reservedContainers.entrySet()) {
+        list.addAll(e.getValue().values());
+      }
+      return list;
+    } finally {
+      readLock.unlock();
     }
-    return reservedContainers;
+
   }
   
-  public synchronized boolean reserveIncreasedContainer(SchedulerNode node,
+  public boolean reserveIncreasedContainer(SchedulerNode node,
       SchedulerRequestKey schedulerKey, RMContainer rmContainer,
       Resource reservedResource) {
-    if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
-      attemptResourceUsage.incReserved(node.getPartition(),
-          reservedResource);
-      // succeeded
-      return true;
+    try {
+      writeLock.lock();
+      if (commonReserve(node, schedulerKey, rmContainer, reservedResource)) {
+        attemptResourceUsage.incReserved(node.getPartition(), reservedResource);
+        // succeeded
+        return true;
+      }
+
+      return false;
+    } finally {
+      writeLock.unlock();
     }
-    
-    return false;
+
   }
   
-  private synchronized boolean commonReserve(SchedulerNode node,
+  private boolean commonReserve(SchedulerNode node,
       SchedulerRequestKey schedulerKey, RMContainer rmContainer,
       Resource reservedResource) {
     try {
@@ -423,101 +490,100 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return true;
   }
   
-  public synchronized RMContainer reserve(SchedulerNode node,
+  public RMContainer reserve(SchedulerNode node,
       SchedulerRequestKey schedulerKey, RMContainer rmContainer,
       Container container) {
-    // Create RMContainer if necessary
-    if (rmContainer == null) {
-      rmContainer =
-          new RMContainerImpl(container, getApplicationAttemptId(),
-              node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
-      attemptResourceUsage.incReserved(node.getPartition(),
-          container.getResource());
-      ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
-
-      // Reset the re-reservation count
-      resetReReservations(schedulerKey);
-    } else {
-      // Note down the re-reservation
-      addReReservation(schedulerKey);
-    }
-    
-    commonReserve(node, schedulerKey, rmContainer, container.getResource());
+    try {
+      writeLock.lock();
+      // Create RMContainer if necessary
+      if (rmContainer == null) {
+        rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
+            node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+        attemptResourceUsage.incReserved(node.getPartition(),
+            container.getResource());
+        ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
+
+        // Reset the re-reservation count
+        resetReReservations(schedulerKey);
+      } else{
+        // Note down the re-reservation
+        addReReservation(schedulerKey);
+      }
 
-    return rmContainer;
-  }
-  
-  /**
-   * Has the application reserved the given <code>node</code> at the
-   * given <code>priority</code>?
-   * @param node node to be checked
-   * @param schedulerKey scheduler key  of reserved container
-   * @return true is reserved, false if not
-   */
-  public synchronized boolean isReserved(SchedulerNode node,
-      SchedulerRequestKey schedulerKey) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(schedulerKey);
-    if (reservedContainers != null) {
-      return reservedContainers.containsKey(node.getNodeID());
+      commonReserve(node, schedulerKey, rmContainer, container.getResource());
+
+      return rmContainer;
+    } finally {
+      writeLock.unlock();
     }
-    return false;
+
   }
-  
-  public synchronized void setHeadroom(Resource globalLimit) {
-    this.resourceLimit = globalLimit; 
+
+  public void setHeadroom(Resource globalLimit) {
+    this.resourceLimit = Resources.componentwiseMax(globalLimit,
+        Resources.none());
   }
 
   /**
    * Get available headroom in terms of resources for the application's user.
    * @return available resource headroom
    */
-  public synchronized Resource getHeadroom() {
-    // Corner case to deal with applications being slightly over-limit
-    if (resourceLimit.getMemorySize() < 0) {
-      resourceLimit.setMemorySize(0);
-    }
-    
+  public Resource getHeadroom() {
     return resourceLimit;
   }
   
-  public synchronized int getNumReservedContainers(
+  public int getNumReservedContainers(
       SchedulerRequestKey schedulerKey) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(schedulerKey);
-    return (reservedContainers == null) ? 0 : reservedContainers.size();
+    try {
+      readLock.lock();
+      Map<NodeId, RMContainer> map = this.reservedContainers.get(
+          schedulerKey);
+      return (map == null) ? 0 : map.size();
+    } finally {
+      readLock.unlock();
+    }
   }
   
   @SuppressWarnings("unchecked")
-  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+  public void containerLaunchedOnNode(ContainerId containerId,
       NodeId nodeId) {
-    // Inform the container
-    RMContainer rmContainer = getRMContainer(containerId);
-    if (rmContainer == null) {
-      // Some unknown container sneaked into the system. Kill it.
-      rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
-      return;
-    }
+    try {
+      writeLock.lock();
+      // Inform the container
+      RMContainer rmContainer = getRMContainer(containerId);
+      if (rmContainer == null) {
+        // Some unknown container sneaked into the system. Kill it.
+        rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeCleanContainerEvent(nodeId, containerId));
+        return;
+      }
 
-    rmContainer.handle(new RMContainerEvent(containerId,
-        RMContainerEventType.LAUNCHED));
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+    } finally {
+      writeLock.unlock();
+    }
   }
   
-  public synchronized void showRequests() {
+  public void showRequests() {
     if (LOG.isDebugEnabled()) {
-      for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-        Map<String, ResourceRequest> requests =
-            getResourceRequests(schedulerKey);
-        if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId()
-              + " headRoom=" + getHeadroom() + " currentConsumption="
-              + attemptResourceUsage.getUsed().getMemorySize());
-          for (ResourceRequest request : requests.values()) {
+      try {
+        readLock.lock();
+        for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
+          Map<String, ResourceRequest> requests = getResourceRequests(
+              schedulerKey);
+          if (requests != null) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
-                + " request=" + request);
+                + " headRoom=" + getHeadroom() + " currentConsumption="
+                + attemptResourceUsage.getUsed().getMemorySize());
+            for (ResourceRequest request : requests.values()) {
+              LOG.debug("showRequests:" + " application=" + getApplicationId()
+                  + " request=" + request);
+            }
           }
         }
+      } finally {
+        readLock.unlock();
       }
     }
   }
@@ -572,54 +638,75 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   // Create container token and update NMToken altogether, if either of them fails for
   // some reason like DNS unavailable, do not return this container and keep it
   // in the newlyAllocatedContainers waiting to be refetched.
-  public synchronized List<Container> pullNewlyAllocatedContainers() {
-    List<Container> returnContainerList =
-        new ArrayList<Container>(newlyAllocatedContainers.size());
-    for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
-        .hasNext();) {
-      RMContainer rmContainer = i.next();
-      Container updatedContainer =
-          updateContainerAndNMToken(rmContainer, true, false);
-      // Only add container to return list when it's not null. updatedContainer
-      // could be null when generate token failed, it can be caused by DNS
-      // resolving failed.
-      if (updatedContainer != null) {
-        returnContainerList.add(updatedContainer);
-        i.remove();
+  public List<Container> pullNewlyAllocatedContainers() {
+    try {
+      writeLock.lock();
+      List<Container> returnContainerList = new ArrayList<Container>(
+          newlyAllocatedContainers.size());
+
+      Iterator<RMContainer> i = newlyAllocatedContainers.iterator();
+      while (i.hasNext()) {
+        RMContainer rmContainer = i.next();
+        Container updatedContainer = updateContainerAndNMToken(rmContainer,
+            true, false);
+        // Only add container to return list when it's not null.
+        // updatedContainer could be null when generate token failed, it can be
+        // caused by DNS resolving failed.
+        if (updatedContainer != null) {
+          returnContainerList.add(updatedContainer);
+          i.remove();
+        }
       }
+      return returnContainerList;
+    } finally {
+      writeLock.unlock();
     }
-    return returnContainerList;
+
   }
   
-  private synchronized List<Container> pullNewlyUpdatedContainers(
+  private List<Container> pullNewlyUpdatedContainers(
       Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
-    List<Container> returnContainerList =
-        new ArrayList<Container>(updatedContainerMap.size());
-    for (Iterator<Entry<ContainerId, RMContainer>> i =
-        updatedContainerMap.entrySet().iterator(); i.hasNext();) {
-      RMContainer rmContainer = i.next().getValue();
-      Container updatedContainer =
-          updateContainerAndNMToken(rmContainer, false, increase);
-      if (updatedContainer != null) {
-        returnContainerList.add(updatedContainer);
-        i.remove();
+    try {
+      writeLock.lock();
+      List <Container> returnContainerList = new ArrayList <Container>(
+          updatedContainerMap.size());
+
+      Iterator<Entry<ContainerId, RMContainer>> i =
+          updatedContainerMap.entrySet().iterator();
+      while (i.hasNext()) {
+        RMContainer rmContainer = i.next().getValue();
+        Container updatedContainer = updateContainerAndNMToken(rmContainer,
+            false, increase);
+        if (updatedContainer != null) {
+          returnContainerList.add(updatedContainer);
+          i.remove();
+        }
       }
+      return returnContainerList;
+    } finally {
+      writeLock.unlock();
     }
-    return returnContainerList;
+
   }
 
-  public synchronized List<Container> pullNewlyIncreasedContainers() {
+  public List<Container> pullNewlyIncreasedContainers() {
     return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
   }
   
-  public synchronized List<Container> pullNewlyDecreasedContainers() {
+  public List<Container> pullNewlyDecreasedContainers() {
     return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
   }
   
-  public synchronized List<NMToken> pullUpdatedNMTokens() {
-    List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens);
-    updatedNMTokens.clear();
-    return returnList;
+  public List<NMToken> pullUpdatedNMTokens() {
+    try {
+      writeLock.lock();
+      List <NMToken> returnList = new ArrayList<>(updatedNMTokens);
+      updatedNMTokens.clear();
+      return returnList;
+    } finally {
+      writeLock.unlock();
+    }
+
   }
 
   public boolean isWaitingForAMContainer() {
@@ -628,53 +715,63 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return (!unmanagedAM && appAttempt.getMasterContainer() == null);
   }
 
-  public synchronized void updateBlacklist(List<String> blacklistAdditions,
+  public void updateBlacklist(List<String> blacklistAdditions,
       List<String> blacklistRemovals) {
-    if (!isStopped) {
-      if (isWaitingForAMContainer()) {
-        // The request is for the AM-container, and the AM-container is launched
-        // by the system. So, update the places that are blacklisted by system
-        // (as opposed to those blacklisted by the application).
-        this.appSchedulingInfo.updatePlacesBlacklistedBySystem(
-            blacklistAdditions, blacklistRemovals);
-      } else {
-        this.appSchedulingInfo.updatePlacesBlacklistedByApp(blacklistAdditions,
-            blacklistRemovals);
+    try {
+      writeLock.lock();
+      if (!isStopped) {
+        if (isWaitingForAMContainer()) {
+          // The request is for the AM-container, and the AM-container is
+          // launched by the system. So, update the places that are blacklisted
+          // by system (as opposed to those blacklisted by the application).
+          this.appSchedulingInfo.updatePlacesBlacklistedBySystem(
+              blacklistAdditions, blacklistRemovals);
+        } else{
+          this.appSchedulingInfo.updatePlacesBlacklistedByApp(
+              blacklistAdditions, blacklistRemovals);
+        }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 
   public boolean isPlaceBlacklisted(String resourceName) {
-    boolean forAMContainer = isWaitingForAMContainer();
-    return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
-      forAMContainer);
+    try {
+      readLock.lock();
+      boolean forAMContainer = isWaitingForAMContainer();
+      return this.appSchedulingInfo.isPlaceBlacklisted(resourceName,
+          forAMContainer);
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
+  public int addMissedNonPartitionedRequestSchedulingOpportunity(
       SchedulerRequestKey schedulerKey) {
-    missedNonPartitionedReqSchedulingOpportunity.add(schedulerKey);
-    return missedNonPartitionedReqSchedulingOpportunity.count(schedulerKey);
+    return missedNonPartitionedReqSchedulingOpportunity.add(
+        schedulerKey, 1) + 1;
   }
 
-  public synchronized void
+  public void
       resetMissedNonPartitionedRequestSchedulingOpportunity(
       SchedulerRequestKey schedulerKey) {
     missedNonPartitionedReqSchedulingOpportunity.setCount(schedulerKey, 0);
   }
 
   
-  public synchronized void addSchedulingOpportunity(
+  public void addSchedulingOpportunity(
       SchedulerRequestKey schedulerKey) {
-    int count = schedulingOpportunities.count(schedulerKey);
-    if (count < Integer.MAX_VALUE) {
-      schedulingOpportunities.setCount(schedulerKey, count + 1);
+    try {
+      schedulingOpportunities.add(schedulerKey, 1);
+    } catch (IllegalArgumentException e) {
+      // This happens when count = MAX_INT, ignore the exception
     }
   }
   
-  public synchronized void subtractSchedulingOpportunity(
+  public void subtractSchedulingOpportunity(
       SchedulerRequestKey schedulerKey) {
-    int count = schedulingOpportunities.count(schedulerKey) - 1;
-    this.schedulingOpportunities.setCount(schedulerKey, Math.max(count,  0));
+    this.schedulingOpportunities.removeExactly(schedulerKey, 1);
   }
 
   /**
@@ -684,7 +781,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * @param schedulerKey Scheduler Key
    * @return number of scheduling opportunities
    */
-  public synchronized int getSchedulingOpportunities(
+  public int getSchedulingOpportunities(
       SchedulerRequestKey schedulerKey) {
     return schedulingOpportunities.count(schedulerKey);
   }
@@ -696,16 +793,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    *
    * @param schedulerKey The priority of the container scheduled.
    */
-  public synchronized void resetSchedulingOpportunities(
+  public void resetSchedulingOpportunities(
       SchedulerRequestKey schedulerKey) {
     resetSchedulingOpportunities(schedulerKey, System.currentTimeMillis());
   }
 
   // used for continuous scheduling
-  public synchronized void resetSchedulingOpportunities(
+  public void resetSchedulingOpportunities(
       SchedulerRequestKey schedulerKey, long currentTimeMs) {
-    lastScheduledContainer.put(schedulerKey, currentTimeMs);
-    schedulingOpportunities.setCount(schedulerKey, 0);
+    try {
+      writeLock.lock();
+      lastScheduledContainer.put(schedulerKey, currentTimeMs);
+      schedulingOpportunities.setCount(schedulerKey, 0);
+    } finally {
+      writeLock.unlock();
+    }
+
   }
 
   @VisibleForTesting
@@ -713,7 +816,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     schedulingOpportunities.setCount(schedulerKey, count);
   }
 
-  synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
+  private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
     long currentTimeMillis = System.currentTimeMillis();
     // Don't walk the whole container list if the resources were computed
     // recently.
@@ -737,101 +840,120 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
   }
 
-  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    AggregateAppResourceUsage runningResourceUsage =
-        getRunningAggregateAppResourceUsage();
-    Resource usedResourceClone =
-        Resources.clone(attemptResourceUsage.getAllUsed());
-    Resource reservedResourceClone =
-        Resources.clone(attemptResourceUsage.getReserved());
-    Resource cluster = rmContext.getScheduler().getClusterResource();
-    ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
-    float queueUsagePerc = 0.0f;
-    float clusterUsagePerc = 0.0f;
-    if (!calc.isInvalidDivisor(cluster)) {
-      queueUsagePerc =
-          calc.divide(cluster, usedResourceClone, Resources.multiply(cluster,
-              queue.getQueueInfo(false, false).getCapacity())) * 100;
-      clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster) * 100;
+  public ApplicationResourceUsageReport getResourceUsageReport() {
+    try {
+      writeLock.lock();
+      AggregateAppResourceUsage runningResourceUsage =
+          getRunningAggregateAppResourceUsage();
+      Resource usedResourceClone = Resources.clone(
+          attemptResourceUsage.getAllUsed());
+      Resource reservedResourceClone = Resources.clone(
+          attemptResourceUsage.getReserved());
+      Resource cluster = rmContext.getScheduler().getClusterResource();
+      ResourceCalculator calc =
+          rmContext.getScheduler().getResourceCalculator();
+      float queueUsagePerc = 0.0f;
+      float clusterUsagePerc = 0.0f;
+      if (!calc.isInvalidDivisor(cluster)) {
+        queueUsagePerc = calc.divide(cluster, usedResourceClone, Resources
+            .multiply(cluster, queue.getQueueInfo(false, false).getCapacity()))
+            * 100;
+        clusterUsagePerc = calc.divide(cluster, usedResourceClone, cluster)
+            * 100;
+      }
+      return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
+          reservedContainers.size(), usedResourceClone, reservedResourceClone,
+          Resources.add(usedResourceClone, reservedResourceClone),
+          runningResourceUsage.getMemorySeconds(),
+          runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
+          clusterUsagePerc);
+    } finally {
+      writeLock.unlock();
     }
-    return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
-        reservedContainers.size(), usedResourceClone, reservedResourceClone,
-        Resources.add(usedResourceClone, reservedResourceClone),
-        runningResourceUsage.getMemorySeconds(),
-        runningResourceUsage.getVcoreSeconds(), queueUsagePerc,
-        clusterUsagePerc);
   }
 
-  public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
+  @VisibleForTesting
+  public Map<ContainerId, RMContainer> getLiveContainersMap() {
     return this.liveContainers;
   }
 
-  public synchronized Resource getResourceLimit() {
-    return this.resourceLimit;
-  }
-
-  public synchronized Map<SchedulerRequestKey, Long>
+  public Map<SchedulerRequestKey, Long>
       getLastScheduledContainer() {
     return this.lastScheduledContainer;
   }
 
-  public synchronized void transferStateFromPreviousAttempt(
+  public void transferStateFromPreviousAttempt(
       SchedulerApplicationAttempt appAttempt) {
-    this.liveContainers = appAttempt.getLiveContainersMap();
-    // this.reReservations = appAttempt.reReservations;
-    this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
-    this.resourceLimit = appAttempt.getResourceLimit();
-    // this.currentReservation = appAttempt.currentReservation;
-    // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
-    // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
-    this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
-    this.appSchedulingInfo
-      .transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
+    try {
+      writeLock.lock();
+      this.liveContainers = appAttempt.getLiveContainersMap();
+      // this.reReservations = appAttempt.reReservations;
+      this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
+      this.setHeadroom(appAttempt.resourceLimit);
+      // this.currentReservation = appAttempt.currentReservation;
+      // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
+      // this.schedulingOpportunities = appAttempt.schedulingOpportunities;
+      this.lastScheduledContainer = appAttempt.getLastScheduledContainer();
+      this.appSchedulingInfo.transferStateFromPreviousAppSchedulingInfo(
+          appAttempt.appSchedulingInfo);
+    } finally {
+      writeLock.unlock();
+    }
   }
   
-  public synchronized void move(Queue newQueue) {
-    QueueMetrics oldMetrics = queue.getMetrics();
-    QueueMetrics newMetrics = newQueue.getMetrics();
-    String newQueueName = newQueue.getQueueName();
-    String user = getUser();
-    for (RMContainer liveContainer : liveContainers.values()) {
-      Resource resource = liveContainer.getContainer().getResource();
-      ((RMContainerImpl)liveContainer).setQueueName(newQueueName);
-      oldMetrics.releaseResources(user, 1, resource);
-      newMetrics.allocateResources(user, 1, resource, false);
-    }
-    for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
-      for (RMContainer reservedContainer : map.values()) {
-        ((RMContainerImpl)reservedContainer).setQueueName(newQueueName);
-        Resource resource = reservedContainer.getReservedResource();
-        oldMetrics.unreserveResource(user, resource);
-        newMetrics.reserveResource(user, resource);
+  public void move(Queue newQueue) {
+    try {
+      writeLock.lock();
+      QueueMetrics oldMetrics = queue.getMetrics();
+      QueueMetrics newMetrics = newQueue.getMetrics();
+      String newQueueName = newQueue.getQueueName();
+      String user = getUser();
+      for (RMContainer liveContainer : liveContainers.values()) {
+        Resource resource = liveContainer.getContainer().getResource();
+        ((RMContainerImpl) liveContainer).setQueueName(newQueueName);
+        oldMetrics.releaseResources(user, 1, resource);
+        newMetrics.allocateResources(user, 1, resource, false);
+      }
+      for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
+        for (RMContainer reservedContainer : map.values()) {
+          ((RMContainerImpl) reservedContainer).setQueueName(newQueueName);
+          Resource resource = reservedContainer.getReservedResource();
+          oldMetrics.unreserveResource(user, resource);
+          newMetrics.reserveResource(user, resource);
+        }
       }
-    }
 
-    appSchedulingInfo.move(newQueue);
-    this.queue = newQueue;
+      appSchedulingInfo.move(newQueue);
+      this.queue = newQueue;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  public synchronized void recoverContainer(SchedulerNode node,
+  public void recoverContainer(SchedulerNode node,
       RMContainer rmContainer) {
-    // recover app scheduling info
-    appSchedulingInfo.recoverContainer(rmContainer);
+    try {
+      writeLock.lock();
+      // recover app scheduling info
+      appSchedulingInfo.recoverContainer(rmContainer);
 
-    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-      return;
+      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+        return;
+      }
+      LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+          + " is recovering container " + rmContainer.getContainerId());
+      liveContainers.put(rmContainer.getContainerId(), rmContainer);
+      attemptResourceUsage.incUsed(node.getPartition(),
+          rmContainer.getContainer().getResource());
+
+      // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+      // is called.
+      // newlyAllocatedContainers.add(rmContainer);
+      // schedulingOpportunities
+      // lastScheduledContainer
+    } finally {
+      writeLock.unlock();
     }
-    LOG.info("SchedulerAttempt " + getApplicationAttemptId()
-      + " is recovering container " + rmContainer.getContainerId());
-    liveContainers.put(rmContainer.getContainerId(), rmContainer);
-    attemptResourceUsage.incUsed(node.getPartition(), rmContainer
-        .getContainer().getResource());
-    
-    // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
-    // is called.
-    // newlyAllocatedContainers.add(rmContainer);
-    // schedulingOpportunities
-    // lastScheduledContainer
   }
 
   public void incNumAllocatedContainers(NodeType containerType,
@@ -915,49 +1037,64 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return attemptResourceUsage;
   }
   
-  public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+  public boolean removeIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
-        containerId);
+    try {
+      writeLock.lock();
+      return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
+          containerId);
+    } finally {
+      writeLock.unlock();
+    }
   }
   
-  public synchronized boolean updateIncreaseRequests(
+  public boolean updateIncreaseRequests(
       List<SchedContainerChangeRequest> increaseRequests) {
-    return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
+    try {
+      writeLock.lock();
+      return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
+    } finally {
+      writeLock.unlock();
+    }
   }
   
-  private synchronized void changeContainerResource(
+  private void changeContainerResource(
       SchedContainerChangeRequest changeRequest, boolean increase) {
-    if (increase) {
-      appSchedulingInfo.increaseContainer(changeRequest);
-    } else {
-      appSchedulingInfo.decreaseContainer(changeRequest);
-    }
+    try {
+      writeLock.lock();
+      if (increase) {
+        appSchedulingInfo.increaseContainer(changeRequest);
+      } else{
+        appSchedulingInfo.decreaseContainer(changeRequest);
+      }
 
-    RMContainer changedRMContainer = changeRequest.getRMContainer(); 
-    changedRMContainer.handle(
-        new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
-            changeRequest.getTargetCapacity(), increase));
-
-    // remove pending and not pulled by AM newly-increased/decreased-containers
-    // and add the new one
-    if (increase) {
-      newlyDecreasedContainers.remove(changeRequest.getContainerId());
-      newlyIncreasedContainers.put(changeRequest.getContainerId(),
-          changedRMContainer);
-    } else {
-      newlyIncreasedContainers.remove(changeRequest.getContainerId());
-      newlyDecreasedContainers.put(changeRequest.getContainerId(),
-          changedRMContainer);
+      RMContainer changedRMContainer = changeRequest.getRMContainer();
+      changedRMContainer.handle(
+          new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
+              changeRequest.getTargetCapacity(), increase));
+
+      // remove pending and not pulled by AM newly-increased or
+      // decreased-containers and add the new one
+      if (increase) {
+        newlyDecreasedContainers.remove(changeRequest.getContainerId());
+        newlyIncreasedContainers.put(changeRequest.getContainerId(),
+            changedRMContainer);
+      } else{
+        newlyIncreasedContainers.remove(changeRequest.getContainerId());
+        newlyDecreasedContainers.put(changeRequest.getContainerId(),
+            changedRMContainer);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
-  public synchronized void decreaseContainer(
+  public void decreaseContainer(
       SchedContainerChangeRequest decreaseRequest) {
     changeContainerResource(decreaseRequest, false);
   }
   
-  public synchronized void increaseContainer(
+  public void increaseContainer(
       SchedContainerChangeRequest increaseRequest) {
     changeContainerResource(increaseRequest, true);
   }
@@ -1025,7 +1162,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     this.isAttemptRecovering = isRecovering;
   }
 
-  public static enum AMState {
+  /**
+   * Different state for Application Master, user can see this state from web UI
+   */
+  public enum AMState {
     UNMANAGED("User launched the Application Master, since it's unmanaged. "),
     INACTIVATED("Application is added to the scheduler and is not yet activated. "),
     ACTIVATED("Application is Activated, waiting for resources to be assigned for AM. "),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a30f2f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 8d4042c..1a3f71f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -251,7 +251,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     return result;
   }
   
-  public synchronized float getLocalityWaitFactor(
+  public float getLocalityWaitFactor(
       SchedulerRequestKey schedulerKey, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
     int requiredResources = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a30f2f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9c84a23..f40ecd7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -99,7 +98,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * to hold the message if its app doesn't not get container from a node
    */
   private String appSkipNodeDiagnostics;
-  private CapacitySchedulerContext capacitySchedulerContext;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -153,118 +151,128 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
     containerAllocator = new ContainerAllocator(this, rc, rmContext,
         activitiesManager);
-
-    if (scheduler instanceof CapacityScheduler) {
-      capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
-    }
   }
 
-  public synchronized boolean containerCompleted(RMContainer rmContainer,
+  public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {
-    ContainerId containerId = rmContainer.getContainerId();
+    try {
+      writeLock.lock();
+      ContainerId containerId = rmContainer.getContainerId();
 
-    // Remove from the list of containers
-    if (null == liveContainers.remove(containerId)) {
-      return false;
-    }
+      // Remove from the list of containers
+      if (null == liveContainers.remove(containerId)) {
+        return false;
+      }
 
-    // Remove from the list of newly allocated containers if found
-    newlyAllocatedContainers.remove(rmContainer);
+      // Remove from the list of newly allocated containers if found
+      newlyAllocatedContainers.remove(rmContainer);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerFinishedEvent(containerId, containerStatus, event));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
 
-    containersToPreempt.remove(containerId);
+      containersToPreempt.remove(containerId);
 
-    Resource containerResource = rmContainer.getContainer().getResource();
-    RMAuditLogger.logSuccess(getUser(),
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
-        getApplicationId(), containerId, containerResource);
-    
-    // Update usage metrics 
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    attemptResourceUsage.decUsed(partition, containerResource);
+      Resource containerResource = rmContainer.getContainer().getResource();
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId, containerResource);
+
+      // Update usage metrics
+      queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+      attemptResourceUsage.decUsed(partition, containerResource);
 
-    // Clear resource utilization metrics cache.
-    lastMemoryAggregateAllocationUpdateTime = -1;
+      // Clear resource utilization metrics cache.
+      lastMemoryAggregateAllocationUpdateTime = -1;
 
-    return true;
+      return true;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node,
+  public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container container) {
+    try {
+      writeLock.lock();
 
-    if (isStopped) {
-      return null;
-    }
-    
-    // Required sanity check - AM can call 'allocate' to update resource
-    // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(schedulerKey) <= 0) {
-      return null;
-    }
+      if (isStopped) {
+        return null;
+      }
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
+      }
 
-    // Create RMContainer
-    RMContainer rmContainer =
-        new RMContainerImpl(container, this.getApplicationAttemptId(),
-            node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
-            request.getNodeLabelExpression());
-    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
+      // Create RMContainer
+      RMContainer rmContainer = new RMContainerImpl(container,
+          this.getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), this.rmContext,
+          request.getNodeLabelExpression());
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
-    updateAMContainerDiagnostics(AMState.ASSIGNED, null);
+      updateAMContainerDiagnostics(AMState.ASSIGNED, null);
 
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
 
-    ContainerId containerId = container.getId();
-    liveContainers.put(containerId, rmContainer);
+      ContainerId containerId = container.getId();
+      liveContainers.put(containerId, rmContainer);
 
-    // Update consumption and track allocations
-    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, schedulerKey, request, container);
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
 
-    attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
+      attemptResourceUsage.incUsed(node.getPartition(),
+          container.getResource());
 
-    // Update resource requests related to "request" and store in RMContainer
-    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerEvent(containerId, RMContainerEventType.START));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.START));
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationAttemptId=" 
-          + containerId.getApplicationAttemptId()
-          + " container=" + containerId + " host="
-          + container.getNodeId().getHost() + " type=" + type);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + containerId
+            .getApplicationAttemptId() + " container=" + containerId + " host="
+            + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId,
+          container.getResource());
+
+      return rmContainer;
+    } finally {
+      writeLock.unlock();
     }
-    RMAuditLogger.logSuccess(getUser(),
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
-        getApplicationId(), containerId, container.getResource());
-    
-    return rmContainer;
   }
 
-  public synchronized boolean unreserve(SchedulerRequestKey schedulerKey,
+  public boolean unreserve(SchedulerRequestKey schedulerKey,
       FiCaSchedulerNode node, RMContainer rmContainer) {
-    // Cancel increase request (if it has reserved increase request 
-    rmContainer.cancelIncreaseReservation();
-    
-    // Done with the reservation?
-    if (internalUnreserve(node, schedulerKey)) {
-      node.unreserveResource(this);
-
-      // Update reserved metrics
-      queue.getMetrics().unreserveResource(getUser(),
-          rmContainer.getReservedResource());
-      queue.decReservedResource(node.getPartition(),
-          rmContainer.getReservedResource());
-      return true;
+    try {
+      writeLock.lock();
+      // Cancel increase request (if it has reserved increase request
+      rmContainer.cancelIncreaseReservation();
+
+      // Done with the reservation?
+      if (internalUnreserve(node, schedulerKey)) {
+        node.unreserveResource(this);
+
+        // Update reserved metrics
+        queue.getMetrics().unreserveResource(getUser(),
+            rmContainer.getReservedResource());
+        queue.decReservedResource(node.getPartition(),
+            rmContainer.getReservedResource());
+        return true;
+      }
+      return false;
+    } finally {
+      writeLock.unlock();
     }
-    return false;
   }
 
   private boolean internalUnreserve(FiCaSchedulerNode node,
@@ -303,33 +311,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return false;
   }
 
-  public synchronized float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
-    
-    // waitFactor can't be more than '1' 
-    // i.e. no point skipping more than clustersize opportunities
-    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
-  }
-
-  public synchronized Resource getTotalPendingRequests() {
-    Resource ret = Resource.newInstance(0, 0);
-    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
-      // to avoid double counting we count only "ANY" resource requests
-      if (ResourceRequest.isAnyLocation(rr.getResourceName())){
-        Resources.addTo(ret,
-            Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+  public void markContainerForPreemption(ContainerId cont) {
+    try {
+      writeLock.lock();
+      // ignore already completed containers
+      if (liveContainers.containsKey(cont)) {
+        containersToPreempt.add(cont);
       }
-    }
-    return ret;
-  }
-
-  public synchronized void markContainerForPreemption(ContainerId cont) {
-    // ignore already completed containers
-    if (liveContainers.containsKey(cont)) {
-      containersToPreempt.add(cont);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -343,94 +333,115 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * @param minimumAllocation
    * @return an allocation
    */
-  public synchronized Allocation getAllocation(ResourceCalculator rc,
+  public Allocation getAllocation(ResourceCalculator resourceCalculator,
       Resource clusterResource, Resource minimumAllocation) {
-
-    Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
-        new HashSet<ContainerId>(containersToPreempt));
-    containersToPreempt.clear();
-    Resource tot = Resource.newInstance(0, 0);
-    for(ContainerId c : currentContPreemption){
-      Resources.addTo(tot,
-          liveContainers.get(c).getContainer().getResource());
+    try {
+      writeLock.lock();
+      Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+          new HashSet<ContainerId>(containersToPreempt));
+      containersToPreempt.clear();
+      Resource tot = Resource.newInstance(0, 0);
+      for (ContainerId c : currentContPreemption) {
+        Resources.addTo(tot, liveContainers.get(c).getContainer()
+            .getResource());
+      }
+      int numCont = (int) Math.ceil(
+          Resources.divide(rc, clusterResource, tot, minimumAllocation));
+      ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED,
+          ResourceRequest.ANY, minimumAllocation, numCont);
+      List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
+      List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
+      List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
+      List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
+      Resource headroom = getHeadroom();
+      setApplicationHeadroomForMetrics(headroom);
+      return new Allocation(newlyAllocatedContainers, headroom, null,
+          currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
+          newlyIncreasedContainers, newlyDecreasedContainers);
+    } finally {
+      writeLock.unlock();
     }
-    int numCont = (int) Math.ceil(
-        Resources.divide(rc, clusterResource, tot, minimumAllocation));
-    ResourceRequest rr = ResourceRequest.newInstance(
-        Priority.UNDEFINED, ResourceRequest.ANY,
-        minimumAllocation, numCont);
-    List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
-    List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
-    List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
-    List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
-    Resource headroom = getHeadroom();
-    setApplicationHeadroomForMetrics(headroom);
-    return new Allocation(newlyAllocatedContainers, headroom, null,
-        currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
-        newlyIncreasedContainers, newlyDecreasedContainers);
   }
-  
-  synchronized public NodeId getNodeIdToUnreserve(
+
+  @VisibleForTesting
+  public NodeId getNodeIdToUnreserve(
       SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
       ResourceCalculator rc, Resource clusterResource) {
+    try {
+      writeLock.lock();
+      // first go around make this algorithm simple and just grab first
+      // reservation that has enough resources
+      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
+          schedulerKey);
+
+      if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
+        for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
+            .entrySet()) {
+          NodeId nodeId = entry.getKey();
+          RMContainer reservedContainer = entry.getValue();
+          if (reservedContainer.hasIncreaseReservation()) {
+            // Currently, only regular container allocation supports continuous
+            // reservation looking, we don't support canceling increase request
+            // reservation when allocating regular container.
+            continue;
+          }
 
-    // first go around make this algorithm simple and just grab first
-    // reservation that has enough resources
-    Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
-        .get(schedulerKey);
-
-    if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
-      for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
-        NodeId nodeId = entry.getKey();
-        RMContainer reservedContainer = entry.getValue();
-        if (reservedContainer.hasIncreaseReservation()) {
-          // Currently, only regular container allocation supports continuous
-          // reservation looking, we don't support canceling increase request
-          // reservation when allocating regular container.
-          continue;
-        }
-        
-        Resource reservedResource = reservedContainer.getReservedResource();
-        
-        // make sure we unreserve one with at least the same amount of
-        // resources, otherwise could affect capacity limits
-        if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
-            reservedResource)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("unreserving node with reservation size: "
-                + reservedResource
-                + " in order to allocate container with size: " + resourceNeedUnreserve);
+          Resource reservedResource = reservedContainer.getReservedResource();
+
+          // make sure we unreserve one with at least the same amount of
+          // resources, otherwise could affect capacity limits
+          if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
+              reservedResource)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "unreserving node with reservation size: " + reservedResource
+                      + " in order to allocate container with size: "
+                      + resourceNeedUnreserve);
+            }
+            return nodeId;
           }
-          return nodeId;
         }
       }
+      return null;
+    } finally {
+      writeLock.unlock();
     }
-    return null;
   }
   
-  public synchronized void setHeadroomProvider(
+  public void setHeadroomProvider(
     CapacityHeadroomProvider headroomProvider) {
-    this.headroomProvider = headroomProvider;
-  }
-
-  public synchronized CapacityHeadroomProvider getHeadroomProvider() {
-    return headroomProvider;
+    try {
+      writeLock.lock();
+      this.headroomProvider = headroomProvider;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   @Override
-  public synchronized Resource getHeadroom() {
-    if (headroomProvider != null) {
-      return headroomProvider.getHeadroom();
+  public Resource getHeadroom() {
+    try {
+      readLock.lock();
+      if (headroomProvider != null) {
+        return headroomProvider.getHeadroom();
+      }
+      return super.getHeadroom();
+    } finally {
+      readLock.unlock();
     }
-    return super.getHeadroom();
+
   }
   
   @Override
-  public synchronized void transferStateFromPreviousAttempt(
+  public void transferStateFromPreviousAttempt(
       SchedulerApplicationAttempt appAttempt) {
-    super.transferStateFromPreviousAttempt(appAttempt);
-    this.headroomProvider = 
-      ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
+    try {
+      writeLock.lock();
+      super.transferStateFromPreviousAttempt(appAttempt);
+      this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
@@ -444,11 +455,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
       // Update the node
       node.reserveResource(this, schedulerKey, rmContainer);
-      
+
       // Succeeded
       return true;
     }
-    
+
     return false;
   }
 
@@ -515,9 +526,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       showRequests();
     }
 
-    synchronized (this) {
+    try {
+      writeLock.lock();
       return containerAllocator.assignContainers(clusterResource, node,
           schedulingMode, currentResourceLimits, reservedContainer);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -625,23 +639,33 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
    * Capacity Scheduler.
    */
   @Override
-  public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    ApplicationResourceUsageReport report = super.getResourceUsageReport();
-    Resource cluster = rmContext.getScheduler().getClusterResource();
-    Resource totalPartitionRes =
-        rmContext.getNodeLabelManager()
-          .getResourceByLabel(getAppAMNodePartitionName(), cluster);
-    ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator();
-    if (!calc.isInvalidDivisor(totalPartitionRes)) {
-      float queueAbsMaxCapPerPartition =
-          ((AbstractCSQueue)getQueue()).getQueueCapacities()
-            .getAbsoluteCapacity(getAppAMNodePartitionName());
-      float queueUsagePerc =
-          calc.divide(totalPartitionRes, report.getUsedResources(),
-              Resources.multiply(totalPartitionRes,
-                  queueAbsMaxCapPerPartition)) * 100;
-      report.setQueueUsagePercentage(queueUsagePerc);
+  public ApplicationResourceUsageReport getResourceUsageReport() {
+    try {
+      // Use write lock here because
+      // SchedulerApplicationAttempt#getResourceUsageReport updated fields
+      // TODO: improve this
+      writeLock.lock();
+      ApplicationResourceUsageReport report = super.getResourceUsageReport();
+      Resource cluster = rmContext.getScheduler().getClusterResource();
+      Resource totalPartitionRes =
+          rmContext.getNodeLabelManager().getResourceByLabel(
+              getAppAMNodePartitionName(), cluster);
+      ResourceCalculator calc =
+          rmContext.getScheduler().getResourceCalculator();
+      if (!calc.isInvalidDivisor(totalPartitionRes)) {
+        float queueAbsMaxCapPerPartition =
+            ((AbstractCSQueue) getQueue()).getQueueCapacities()
+                .getAbsoluteCapacity(getAppAMNodePartitionName());
+        float queueUsagePerc = calc.divide(totalPartitionRes,
+            report.getUsedResources(),
+            Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition))
+            * 100;
+        report.setQueueUsagePercentage(queueUsagePerc);
+      }
+      return report;
+    } finally {
+      writeLock.unlock();
     }
-    return report;
+
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[7/9] hadoop git commit: HDFS-10875. Optimize du -x to cache intermediate result. Contributed by Xiao Chen.

Posted by dr...@apache.org.
HDFS-10875. Optimize du -x to cache intermediate result. Contributed by Xiao Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e52d6e7a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e52d6e7a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e52d6e7a

Branch: refs/heads/HADOOP-12756
Commit: e52d6e7a46ceef74dd8d8a3d49c49420e3271365
Parents: 98bdb51
Author: Xiao Chen <xi...@apache.org>
Authored: Mon Sep 19 21:44:42 2016 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Mon Sep 19 21:44:42 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/server/namenode/INodeDirectory.java  | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e52d6e7a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 9a8f9b2..24c8815 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -630,14 +630,15 @@ public class INodeDirectory extends INodeWithAdditionalFields
       ContentSummaryComputationContext summary) {
     final DirectoryWithSnapshotFeature sf = getDirectoryWithSnapshotFeature();
     if (sf != null && snapshotId == Snapshot.CURRENT_STATE_ID) {
+      final ContentCounts counts = new ContentCounts.Builder().build();
       // if the getContentSummary call is against a non-snapshot path, the
       // computation should include all the deleted files/directories
       sf.computeContentSummary4Snapshot(summary.getBlockStoragePolicySuite(),
-          summary.getCounts());
-      // Also compute ContentSummary for snapshotCounts (So we can extract it
+          counts);
+      summary.getCounts().addContents(counts);
+      // Also add ContentSummary to snapshotCounts (So we can extract it
       // later from the ContentSummary of all).
-      sf.computeContentSummary4Snapshot(summary.getBlockStoragePolicySuite(),
-          summary.getSnapshotCounts());
+      summary.getSnapshotCounts().addContents(counts);
     }
     final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
     if (q != null && snapshotId == Snapshot.CURRENT_STATE_ID) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/9] hadoop git commit: YARN-3141. Improve locks in SchedulerApplicationAttempt/FSAppAttempt/FiCaSchedulerApp. Contributed by Wangda Tan

Posted by dr...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a30f2f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 9e5a807..3555faa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -123,65 +123,72 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return queue.getMetrics();
   }
 
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  public void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
-    Container container = rmContainer.getContainer();
-    ContainerId containerId = container.getId();
-    
-    // Remove from the list of newly allocated containers if found
-    newlyAllocatedContainers.remove(rmContainer);
-    
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerFinishedEvent(
-            containerId,
-            containerStatus,
-            event)
-    );
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Completed container: " + rmContainer.getContainerId() +
-              " in state: " + rmContainer.getState() + " event:" + event);
-    }
+    try {
+      writeLock.lock();
+      Container container = rmContainer.getContainer();
+      ContainerId containerId = container.getId();
+
+      // Remove from the list of newly allocated containers if found
+      newlyAllocatedContainers.remove(rmContainer);
+
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerFinishedEvent(containerId, containerStatus, event));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed container: " + rmContainer.getContainerId()
+            + " in state: " + rmContainer.getState() + " event:" + event);
+      }
+
+      // Remove from the list of containers
+      liveContainers.remove(rmContainer.getContainerId());
 
-    // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+      Resource containerResource = rmContainer.getContainer().getResource();
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
+          "SchedulerApp", getApplicationId(), containerId, containerResource);
 
-    Resource containerResource = rmContainer.getContainer().getResource();
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
-        getApplicationId(), containerId, containerResource);
-    
-    // Update usage metrics 
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    this.attemptResourceUsage.decUsed(containerResource);
+      // Update usage metrics
+      queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+      this.attemptResourceUsage.decUsed(containerResource);
 
-    // remove from preemption map if it is completed
-    preemptionMap.remove(rmContainer);
+      // remove from preemption map if it is completed
+      preemptionMap.remove(rmContainer);
 
-    // Clear resource utilization metrics cache.
-    lastMemoryAggregateAllocationUpdateTime = -1;
+      // Clear resource utilization metrics cache.
+      lastMemoryAggregateAllocationUpdateTime = -1;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized void unreserveInternal(
+  private void unreserveInternal(
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(schedulerKey);
-    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
-    if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(schedulerKey);
-    }
-    
-    // Reset the re-reservation count
-    resetReReservations(schedulerKey);
+    try {
+      writeLock.lock();
+      Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
+          schedulerKey);
+      RMContainer reservedContainer = reservedContainers.remove(
+          node.getNodeID());
+      if (reservedContainers.isEmpty()) {
+        this.reservedContainers.remove(schedulerKey);
+      }
+
+      // Reset the re-reservation count
+      resetReReservations(schedulerKey);
 
-    Resource resource = reservedContainer.getContainer().getResource();
-    this.attemptResourceUsage.decReserved(resource);
+      Resource resource = reservedContainer.getContainer().getResource();
+      this.attemptResourceUsage.decReserved(resource);
 
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size()
-        + " at priority " + schedulerKey.getPriority() + "; currentReservation "
-        + this.attemptResourceUsage.getReserved());
+      LOG.info(
+          "Application " + getApplicationId() + " unreserved " + " on node "
+              + node + ", currently has " + reservedContainers.size()
+              + " at priority " + schedulerKey.getPriority()
+              + "; currentReservation " + this.attemptResourceUsage
+              .getReserved());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private void subtractResourcesOnBlacklistedNodes(
@@ -239,17 +246,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return headroom;
   }
 
-  public synchronized float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = 
-        Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
-    
-    // waitFactor can't be more than '1' 
-    // i.e. no point skipping more than clustersize opportunities
-    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
-  }
-
   /**
    * Return the level at which we are allowed to schedule containers, given the
    * current size of the cluster and thresholds indicating how many nodes to
@@ -261,44 +257,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param rackLocalityThreshold rackLocalityThreshold
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevel(
+  NodeType getAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, int numNodes,
       double nodeLocalityThreshold, double rackLocalityThreshold) {
     // upper limit on threshold
-    if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
-    if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
+    if (nodeLocalityThreshold > 1.0) {
+      nodeLocalityThreshold = 1.0;
+    }
+    if (rackLocalityThreshold > 1.0) {
+      rackLocalityThreshold = 1.0;
+    }
 
     // If delay scheduling is not being used, can schedule anywhere
     if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // Default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
-
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+    try {
+      writeLock.lock();
 
-    // If level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
+      // Default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
+      }
 
-    double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
-      rackLocalityThreshold;
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // Relax locality constraints once we've surpassed threshold.
-    if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey);
+      // If level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey);
+
+      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityThreshold :
+          rackLocalityThreshold;
+
+      // Relax locality constraints once we've surpassed threshold.
+      if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
   /**
@@ -311,119 +319,131 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param currentTimeMs currentTimeMs
    * @return NodeType
    */
-  public synchronized NodeType getAllowedLocalityLevelByTime(
+  NodeType getAllowedLocalityLevelByTime(
       SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
       long rackLocalityDelayMs, long currentTimeMs) {
-
     // if not being used, can schedule anywhere
     if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
       return NodeType.OFF_SWITCH;
     }
 
-    // default level is NODE_LOCAL
-    if (!allowedLocalityLevel.containsKey(schedulerKey)) {
-      // add the initial time of priority to prevent comparing with FsApp
-      // startTime and allowedLocalityLevel degrade
-      lastScheduledContainer.put(schedulerKey, currentTimeMs);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Init the lastScheduledContainer time, priority: "
-            + schedulerKey.getPriority() + ", time: " + currentTimeMs);
+    try {
+      writeLock.lock();
+
+      // default level is NODE_LOCAL
+      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
+        // add the initial time of priority to prevent comparing with FsApp
+        // startTime and allowedLocalityLevel degrade
+        lastScheduledContainer.put(schedulerKey, currentTimeMs);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Init the lastScheduledContainer time, priority: " + schedulerKey
+                  .getPriority() + ", time: " + currentTimeMs);
+        }
+        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
+        return NodeType.NODE_LOCAL;
       }
-      allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
-      return NodeType.NODE_LOCAL;
-    }
 
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
 
-    // if level is already most liberal, we're done
-    if (allowed.equals(NodeType.OFF_SWITCH)) {
-      return NodeType.OFF_SWITCH;
-    }
-
-    // check waiting time
-    long waitTime = currentTimeMs;
-    if (lastScheduledContainer.containsKey(schedulerKey)) {
-      waitTime -= lastScheduledContainer.get(schedulerKey);
-    } else {
-      waitTime -= getStartTime();
-    }
+      // if level is already most liberal, we're done
+      if (allowed.equals(NodeType.OFF_SWITCH)) {
+        return NodeType.OFF_SWITCH;
+      }
 
-    long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
-            nodeLocalityDelayMs : rackLocalityDelayMs;
+      // check waiting time
+      long waitTime = currentTimeMs;
+      if (lastScheduledContainer.containsKey(schedulerKey)) {
+        waitTime -= lastScheduledContainer.get(schedulerKey);
+      } else{
+        waitTime -= getStartTime();
+      }
 
-    if (waitTime > thresholdTime) {
-      if (allowed.equals(NodeType.NODE_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
-      } else if (allowed.equals(NodeType.RACK_LOCAL)) {
-        allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
-        resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+      long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+          nodeLocalityDelayMs :
+          rackLocalityDelayMs;
+
+      if (waitTime > thresholdTime) {
+        if (allowed.equals(NodeType.NODE_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
+          resetSchedulingOpportunities(schedulerKey, currentTimeMs);
+        }
       }
+      return allowedLocalityLevel.get(schedulerKey);
+    } finally {
+      writeLock.unlock();
     }
-    return allowedLocalityLevel.get(schedulerKey);
   }
 
-  synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
+  public RMContainer allocate(NodeType type, FSSchedulerNode node,
       SchedulerRequestKey schedulerKey, ResourceRequest request,
       Container reservedContainer) {
-    // Update allowed locality level
-    NodeType allowed = allowedLocalityLevel.get(schedulerKey);
-    if (allowed != null) {
-      if (allowed.equals(NodeType.OFF_SWITCH) &&
-          (type.equals(NodeType.NODE_LOCAL) ||
-              type.equals(NodeType.RACK_LOCAL))) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+    RMContainer rmContainer;
+    Container container;
+
+    try {
+      writeLock.lock();
+      // Update allowed locality level
+      NodeType allowed = allowedLocalityLevel.get(schedulerKey);
+      if (allowed != null) {
+        if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
+            NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
+            NodeType.NODE_LOCAL)) {
+          this.resetAllowedLocalityLevel(schedulerKey, type);
+        }
       }
-      else if (allowed.equals(NodeType.RACK_LOCAL) &&
-          type.equals(NodeType.NODE_LOCAL)) {
-        this.resetAllowedLocalityLevel(schedulerKey, type);
+
+      // Required sanity check - AM can call 'allocate' to update resource
+      // request without locking the scheduler, hence we need to check
+      if (getTotalRequiredResources(schedulerKey) <= 0) {
+        return null;
       }
-    }
 
-    // Required sanity check - AM can call 'allocate' to update resource 
-    // request without locking the scheduler, hence we need to check
-    if (getTotalRequiredResources(schedulerKey) <= 0) {
-      return null;
-    }
+      container = reservedContainer;
+      if (container == null) {
+        container = createContainer(node, request.getCapability(),
+            schedulerKey);
+      }
 
-    Container container = reservedContainer;
-    if (container == null) {
-      container =
-          createContainer(node, request.getCapability(), schedulerKey);
-    }
-    
-    // Create RMContainer
-    RMContainer rmContainer = new RMContainerImpl(container,
-        getApplicationAttemptId(), node.getNodeID(),
-        appSchedulingInfo.getUser(), rmContext);
-    ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
+      // Create RMContainer
+      rmContainer = new RMContainerImpl(container,
+          getApplicationAttemptId(), node.getNodeID(),
+          appSchedulingInfo.getUser(), rmContext);
+      ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
-    liveContainers.put(container.getId(), rmContainer);    
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(rmContainer);
+      liveContainers.put(container.getId(), rmContainer);
 
-    // Update consumption and track allocations
-    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
-        type, node, schedulerKey, request, container);
-    this.attemptResourceUsage.incUsed(container.getResource());
+      // Update consumption and track allocations
+      List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+          type, node, schedulerKey, request, container);
+      this.attemptResourceUsage.incUsed(container.getResource());
 
-    // Update resource requests related to "request" and store in RMContainer
-    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+      // Update resource requests related to "request" and store in RMContainer
+      ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
 
-    // Inform the container
-    rmContainer.handle(
-        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+      // Inform the container
+      rmContainer.handle(
+          new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationAttemptId=" + container.getId()
+            .getApplicationAttemptId() + " container=" + container.getId()
+            + " host=" + container.getNodeId().getHost() + " type=" + type);
+      }
+      RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+          "SchedulerApp", getApplicationId(), container.getId(),
+          container.getResource());
+    } finally {
+      writeLock.unlock();
+    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocate: applicationAttemptId=" 
-          + container.getId().getApplicationAttemptId() 
-          + " container=" + container.getId() + " host="
-          + container.getNodeId().getHost() + " type=" + type);
-    }
-    RMAuditLogger.logSuccess(getUser(), 
-        AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 
-        getApplicationId(), container.getId(), container.getResource());
-    
     return rmContainer;
   }
 
@@ -434,19 +454,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @param schedulerKey Scheduler Key
    * @param level NodeType
    */
-  public synchronized void resetAllowedLocalityLevel(
+  public void resetAllowedLocalityLevel(
       SchedulerRequestKey schedulerKey, NodeType level) {
-    NodeType old = allowedLocalityLevel.get(schedulerKey);
-    LOG.info("Raising locality level from " + old + " to " + level + " at " +
-        " priority " + schedulerKey.getPriority());
-    allowedLocalityLevel.put(schedulerKey, level);
+    NodeType old;
+    try {
+      writeLock.lock();
+      old = allowedLocalityLevel.put(schedulerKey, level);
+    } finally {
+      writeLock.unlock();
+    }
+
+    LOG.info("Raising locality level from " + old + " to " + level + " at "
+        + " priority " + schedulerKey.getPriority());
   }
 
   // related methods
   public void addPreemption(RMContainer container, long time) {
     assert preemptionMap.get(container) == null;
-    preemptionMap.put(container, time);
-    Resources.addTo(preemptedResources, container.getAllocatedResource());
+    try {
+      writeLock.lock();
+      preemptionMap.put(container, time);
+      Resources.addTo(preemptedResources, container.getAllocatedResource());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public Long getContainerPreemptionTime(RMContainer container) {
@@ -584,21 +615,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         getUser(), rmContainer.getContainer().getResource());
   }
 
-  private synchronized void setReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations == null) {
-      rackReservations = new HashSet<>();
-      reservations.put(rackName, rackReservations);
+  private void setReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations == null) {
+        rackReservations = new HashSet<>();
+        reservations.put(rackName, rackReservations);
+      }
+      rackReservations.add(node.getNodeName());
+    } finally {
+      writeLock.unlock();
     }
-    rackReservations.add(node.getNodeName());
   }
 
-  private synchronized void clearReservation(SchedulerNode node) {
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    Set<String> rackReservations = reservations.get(rackName);
-    if (rackReservations != null) {
-      rackReservations.remove(node.getNodeName());
+  private void clearReservation(SchedulerNode node) {
+    String rackName =
+        node.getRackName() == null ? "NULL" : node.getRackName();
+
+    try {
+      writeLock.lock();
+      Set<String> rackReservations = reservations.get(rackName);
+      if (rackReservations != null) {
+        rackReservations.remove(node.getNodeName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -737,7 +782,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // For each priority, see if we can schedule a node local, rack local
     // or off-switch request. Rack of off-switch requests may be delayed
     // (not scheduled) in order to promote better locality.
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey schedulerKey : keysToTry) {
         // Skip it for reserved container, since
         // we already check it in isValidReservation.
@@ -772,8 +818,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
-          return assignContainer(node, localRequest,
-              NodeType.NODE_LOCAL, reserved, schedulerKey);
+          return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
+              reserved, schedulerKey);
         }
 
         if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@@ -781,29 +827,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         }
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
-            && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
-            allowedLocality.equals(NodeType.OFF_SWITCH))) {
-          return assignContainer(node, rackLocalRequest,
-              NodeType.RACK_LOCAL, reserved, schedulerKey);
+            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
+            .equals(NodeType.OFF_SWITCH))) {
+          return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
+              reserved, schedulerKey);
         }
 
-        ResourceRequest offSwitchRequest =
-            getResourceRequest(schedulerKey, ResourceRequest.ANY);
+        ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
+            ResourceRequest.ANY);
         if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
           continue;
         }
 
-        if (offSwitchRequest != null &&
-            offSwitchRequest.getNumContainers() != 0) {
-          if (!hasNodeOrRackLocalRequests(schedulerKey) ||
-              allowedLocality.equals(NodeType.OFF_SWITCH)) {
-            return assignContainer(
-                node, offSwitchRequest, NodeType.OFF_SWITCH, reserved,
-                schedulerKey);
+        if (offSwitchRequest != null
+            && offSwitchRequest.getNumContainers() != 0) {
+          if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
+              .equals(NodeType.OFF_SWITCH)) {
+            return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
+                reserved, schedulerKey);
           }
         }
       }
+    } finally {
+      writeLock.unlock();
     }
+
     return Resources.none();
   }
 
@@ -963,14 +1011,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     Resources.addTo(demand, getCurrentConsumption());
 
     // Add up outstanding resource requests
-    synchronized (this) {
+    try {
+      writeLock.lock();
       for (SchedulerRequestKey k : getSchedulerKeys()) {
         ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
         if (r != null) {
-          Resources.multiplyAndAddTo(demand,
-              r.getCapability(), r.getNumContainers());
+          Resources.multiplyAndAddTo(demand, r.getCapability(),
+              r.getNumContainers());
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[6/9] hadoop git commit: HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh Balamohan.

Posted by dr...@apache.org.
HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/98bdb513
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/98bdb513
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/98bdb513

Branch: refs/heads/HADOOP-12756
Commit: 98bdb5139769eb55893971b43b9c23da9513a784
Parents: 7558dbb
Author: Chris Nauroth <cn...@apache.org>
Authored: Mon Sep 19 15:16:47 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Mon Sep 19 15:16:47 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/tools/DistCpConstants.java    |   4 +
 .../apache/hadoop/tools/SimpleCopyListing.java  | 114 +++++++++++++++++--
 .../apache/hadoop/tools/TestCopyListing.java    |  83 +++++++++++++-
 3 files changed, 189 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 95d26df..96f364c 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -58,6 +58,10 @@ public class DistCpConstants {
   public static final String CONF_LABEL_APPEND = "distcp.copy.append";
   public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
   public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
+  public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
+      "distcp.simplelisting.file.status.size";
+  public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES =
+      "distcp.simplelisting.randomize.files";
   public static final String CONF_LABEL_FILTERS_FILE =
       "distcp.filters.file";
   public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
index 3f52203..bc30aa1 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +43,10 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
 
 import static org.apache.hadoop.tools.DistCpConstants
         .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@@ -56,13 +60,19 @@ import static org.apache.hadoop.tools.DistCpConstants
 public class SimpleCopyListing extends CopyListing {
   private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
 
+  public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
+  public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
+
   private long totalPaths = 0;
   private long totalDirs = 0;
   private long totalBytesToCopy = 0;
   private int numListstatusThreads = 1;
+  private final int fileStatusLimit;
+  private final boolean randomizeFileListing;
   private final int maxRetries = 3;
   private CopyFilter copyFilter;
   private DistCpSync distCpSync;
+  private final Random rnd = new Random();
 
   /**
    * Protected constructor, to initialize configuration.
@@ -76,6 +86,17 @@ public class SimpleCopyListing extends CopyListing {
     numListstatusThreads = getConf().getInt(
         DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
         DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
+    fileStatusLimit = Math.max(1, getConf()
+        .getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
+        DEFAULT_FILE_STATUS_SIZE));
+    randomizeFileListing = getConf().getBoolean(
+        DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
+        DEFAULT_RANDOMIZE_FILE_LISTING);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("numListstatusThreads=" + numListstatusThreads
+          + ", fileStatusLimit=" + fileStatusLimit
+          + ", randomizeFileListing=" + randomizeFileListing);
+    }
     copyFilter = CopyFilter.getCopyFilter(getConf());
     copyFilter.initialize();
   }
@@ -83,9 +104,13 @@ public class SimpleCopyListing extends CopyListing {
   @VisibleForTesting
   protected SimpleCopyListing(Configuration configuration,
                               Credentials credentials,
-                              int numListstatusThreads) {
+                              int numListstatusThreads,
+                              int fileStatusLimit,
+                              boolean randomizeFileListing) {
     super(configuration, credentials);
     this.numListstatusThreads = numListstatusThreads;
+    this.fileStatusLimit = Math.max(1, fileStatusLimit);
+    this.randomizeFileListing = randomizeFileListing;
   }
 
   protected SimpleCopyListing(Configuration configuration,
@@ -236,6 +261,7 @@ public class SimpleCopyListing extends CopyListing {
     FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
 
     try {
+      List<FileStatusInfo> fileStatuses = Lists.newArrayList();
       for (DiffInfo diff : diffList) {
         // add snapshot paths prefix
         diff.target = new Path(options.getSourcePaths().get(0), diff.target);
@@ -259,10 +285,13 @@ public class SimpleCopyListing extends CopyListing {
             sourceDirs.add(sourceStatus);
 
             traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                sourceRoot, options, excludeList);
+                sourceRoot, options, excludeList, fileStatuses);
           }
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(fileStatuses, fileListWriter);
+      }
       fileListWriter.close();
       fileListWriter = null;
     } finally {
@@ -296,6 +325,7 @@ public class SimpleCopyListing extends CopyListing {
     }
 
     try {
+      List<FileStatusInfo> statusList = Lists.newArrayList();
       for (Path path: options.getSourcePaths()) {
         FileSystem sourceFS = path.getFileSystem(getConf());
         final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
@@ -326,8 +356,14 @@ public class SimpleCopyListing extends CopyListing {
                   preserveAcls && sourceStatus.isDirectory(),
                   preserveXAttrs && sourceStatus.isDirectory(),
                   preserveRawXAttrs && sourceStatus.isDirectory());
-            writeToFileListing(fileListWriter, sourceCopyListingStatus,
-                sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(statusList,
+                  new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, sourceCopyListingStatus,
+                  sourcePathRoot);
+            }
 
             if (sourceStatus.isDirectory()) {
               if (LOG.isDebugEnabled()) {
@@ -337,9 +373,12 @@ public class SimpleCopyListing extends CopyListing {
             }
           }
           traverseDirectory(fileListWriter, sourceFS, sourceDirs,
-                            sourcePathRoot, options, null);
+              sourcePathRoot, options, null, statusList);
         }
       }
+      if (randomizeFileListing) {
+        writeToFileListing(statusList, fileListWriter);
+      }
       fileListWriter.close();
       printStats();
       LOG.info("Build file listing completed.");
@@ -349,6 +388,52 @@ public class SimpleCopyListing extends CopyListing {
     }
   }
 
+  private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
+      throws IOException {
+    fileStatusInfoList.add(statusInfo);
+    if (fileStatusInfoList.size() > fileStatusLimit) {
+      writeToFileListing(fileStatusInfoList, fileListWriter);
+    }
+  }
+
+  @VisibleForTesting
+  void setSeedForRandomListing(long seed) {
+    this.rnd.setSeed(seed);
+  }
+
+  private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
+      SequenceFile.Writer fileListWriter) throws IOException {
+    /**
+     * In cloud storage systems, it is possible to get region hotspot.
+     * Shuffling paths can avoid such cases and also ensure that
+     * some mappers do not get lots of similar paths.
+     */
+    Collections.shuffle(fileStatusInfoList, rnd);
+    for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
+      }
+      writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
+          fileStatusInfo.sourceRootPath);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of paths written to fileListing="
+          + fileStatusInfoList.size());
+    }
+    fileStatusInfoList.clear();
+  }
+
+  private static class FileStatusInfo {
+    private CopyListingFileStatus fileStatus;
+    private Path sourceRootPath;
+
+    FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
+      this.fileStatus = fileStatus;
+      this.sourceRootPath = sourceRootPath;
+    }
+  }
+
   private Path computeSourceRootPath(FileStatus sourceStatus,
                                      DistCpOptions options) throws IOException {
 
@@ -516,15 +601,18 @@ public class SimpleCopyListing extends CopyListing {
                                  ArrayList<FileStatus> sourceDirs,
                                  Path sourcePathRoot,
                                  DistCpOptions options,
-                                 HashSet<String> excludeList)
+                                 HashSet<String> excludeList,
+                                 List<FileStatusInfo> fileStatuses)
                                  throws IOException {
     final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
     final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
     final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
 
     assert numListstatusThreads > 0;
-    LOG.debug("Starting thread pool of " + numListstatusThreads +
-              " listStatus workers.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Starting thread pool of " + numListstatusThreads +
+          " listStatus workers.");
+    }
     ProducerConsumer<FileStatus, FileStatus[]> workers =
         new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
     for (int i = 0; i < numListstatusThreads; i++) {
@@ -551,8 +639,14 @@ public class SimpleCopyListing extends CopyListing {
                 preserveAcls && child.isDirectory(),
                 preserveXAttrs && child.isDirectory(),
                 preserveRawXattrs && child.isDirectory());
-            writeToFileListing(fileListWriter, childCopyListingStatus,
-                 sourcePathRoot);
+            if (randomizeFileListing) {
+              addToFileListing(fileStatuses,
+                  new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
+                  fileListWriter);
+            } else {
+              writeToFileListing(fileListWriter, childCopyListingStatus,
+                  sourcePathRoot);
+            }
           }
           if (retry < maxRetries) {
             if (child.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/98bdb513/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
index 896763d..ea63e23 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.tools.util.TestDistCpUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.Credentials;
@@ -46,7 +45,9 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 @RunWith(value = Parameterized.class)
 public class TestCopyListing extends SimpleCopyListing {
@@ -77,7 +78,7 @@ public class TestCopyListing extends SimpleCopyListing {
   }
 
   public TestCopyListing(int numListstatusThreads) {
-    super(config, CREDENTIALS, numListstatusThreads);
+    super(config, CREDENTIALS, numListstatusThreads, 0, false);
   }
 
   protected TestCopyListing(Configuration configuration) {
@@ -221,6 +222,84 @@ public class TestCopyListing extends SimpleCopyListing {
     }
   }
 
+  @Test(timeout=60000)
+  public void testWithRandomFileListing() throws IOException {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(getConf());
+      List<Path> srcPaths = new ArrayList<>();
+      List<Path> srcFiles = new ArrayList<>();
+      Path target = new Path("/tmp/out/1");
+      final int pathCount = 25;
+      for (int i = 0; i < pathCount; i++) {
+        Path p = new Path("/tmp", String.valueOf(i));
+        srcPaths.add(p);
+        fs.mkdirs(p);
+
+        Path fileName = new Path(p, i + ".txt");
+        srcFiles.add(fileName);
+        try (OutputStream out = fs.create(fileName)) {
+          out.write(i);
+        }
+      }
+
+      Path listingFile = new Path("/tmp/file");
+      DistCpOptions options = new DistCpOptions(srcPaths, target);
+      options.setSyncFolder(true);
+
+      // Check without randomizing files
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
+      SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+      listing.buildListing(listingFile, options);
+
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+      validateFinalListing(listingFile, srcFiles);
+      fs.delete(listingFile, true);
+
+      // Check with randomized file listing
+      getConf().setBoolean(
+          DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
+      listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+
+      // Set the seed for randomness, so that it can be verified later
+      long seed = System.nanoTime();
+      listing.setSeedForRandomListing(seed);
+      listing.buildListing(listingFile, options);
+      Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
+
+      // validate randomness
+      Collections.shuffle(srcFiles, new Random(seed));
+      validateFinalListing(listingFile, srcFiles);
+    } finally {
+      TestDistCpUtils.delete(fs, "/tmp");
+    }
+  }
+
+  private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
+      throws IOException {
+    FileSystem fs = pathToListFile.getFileSystem(config);
+
+    try (SequenceFile.Reader reader = new SequenceFile.Reader(
+        config, SequenceFile.Reader.file(pathToListFile))) {
+      CopyListingFileStatus currentVal = new CopyListingFileStatus();
+
+      Text currentKey = new Text();
+      int idx = 0;
+      while (reader.next(currentKey)) {
+        reader.getCurrentValue(currentVal);
+        Assert.assertEquals("srcFiles.size=" + srcFiles.size()
+                + ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)),
+            currentVal.getPath());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
+        }
+        idx++;
+      }
+    }
+  }
+
+
   @Test(timeout=10000)
   public void testBuildListingForSingleFile() {
     FileSystem fs = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[8/9] hadoop git commit: HADOOP-13624. Rename TestAliyunOSSContractDispCp. Contributed by Genmao Yu

Posted by dr...@apache.org.
HADOOP-13624. Rename TestAliyunOSSContractDispCp. Contributed by Genmao Yu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22af6f8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22af6f8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22af6f8d

Branch: refs/heads/HADOOP-12756
Commit: 22af6f8db3a44cd51514b4851b99adcfad42751d
Parents: 08b3760
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Sep 21 14:02:44 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Sep 21 14:02:44 2016 +0800

----------------------------------------------------------------------
 .../fs/aliyun/oss/AliyunOSSFileSystemStore.java |  6 ++-
 .../oss/TestAliyunOSSFileSystemStore.java       |  6 ++-
 .../fs/aliyun/oss/TestAliyunOSSInputStream.java |  5 ++-
 .../contract/TestAliyunOSSContractDispCp.java   | 44 --------------------
 .../contract/TestAliyunOSSContractDistCp.java   | 44 ++++++++++++++++++++
 5 files changed, 58 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22af6f8d/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index be87fa9..9792a78 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -49,7 +49,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22af6f8d/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
index dee4ccf..7f4bac2 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
@@ -25,7 +25,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.URI;
 import java.security.DigestInputStream;
 import java.security.DigestOutputStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22af6f8d/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
index 892eda0..37af28f 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -24,7 +24,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22af6f8d/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDispCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDispCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDispCp.java
deleted file mode 100644
index 4b482fc..0000000
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDispCp.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.aliyun.oss.contract;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * Contract test suite covering Aliyun OSS integration with DistCp.
- */
-public class TestAliyunOSSContractDispCp extends AbstractContractDistCpTest {
-
-  private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
-
-  @Override
-  protected Configuration createConfiguration() {
-    Configuration newConf = super.createConfiguration();
-    newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
-    newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
-    return newConf;
-  }
-
-  @Override
-  protected AliyunOSSContract createContract(Configuration conf) {
-    return new AliyunOSSContract(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22af6f8d/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
new file mode 100644
index 0000000..18d09d5
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Contract test suite covering Aliyun OSS integration with DistCp.
+ */
+public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
+
+  private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration newConf = super.createConfiguration();
+    newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
+    newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
+    return newConf;
+  }
+
+  @Override
+  protected AliyunOSSContract createContract(Configuration conf) {
+    return new AliyunOSSContract(conf);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/9] hadoop git commit: HDFS-10868. Remove stray references to DFS_HDFS_BLOCKS_METADATA_ENABLED.

Posted by dr...@apache.org.
HDFS-10868. Remove stray references to DFS_HDFS_BLOCKS_METADATA_ENABLED.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c54f6ef3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c54f6ef3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c54f6ef3

Branch: refs/heads/HADOOP-12756
Commit: c54f6ef30fbd5fbb9663e182b76bafb55ef567ad
Parents: b8a30f2
Author: Andrew Wang <wa...@apache.org>
Authored: Mon Sep 19 11:17:03 2016 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Mon Sep 19 11:17:03 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 ---
 .../src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java      | 4 ----
 .../test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java  | 2 --
 3 files changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54f6ef3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 642d4c8..4c754d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -131,9 +131,6 @@ public interface HdfsClientConfigKeys {
           "dfs.client.key.provider.cache.expiry";
   long    DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
               TimeUnit.DAYS.toMillis(10); // 10 days
-  String  DFS_HDFS_BLOCKS_METADATA_ENABLED =
-      "dfs.datanode.hdfs-blocks-metadata.enabled";
-  boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
 
   String  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
       "dfs.datanode.kerberos.principal";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54f6ef3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 3532d25..df45e2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -58,10 +58,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       .DFS_CHECKSUM_TYPE_KEY;
   public static final String  DFS_CHECKSUM_TYPE_DEFAULT =
       HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT;
-  public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED =
-      HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
-  public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT =
-      HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
   public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
       HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT;
   public static final String  DFS_WEBHDFS_NETTY_LOW_WATERMARK =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c54f6ef3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 46420f1..bf29428 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -76,8 +76,6 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add("dfs.corruptfilesreturned.max");
     configurationPropsToSkipCompare
-        .add("dfs.datanode.hdfs-blocks-metadata.enabled");
-    configurationPropsToSkipCompare
         .add("dfs.metrics.session-id");
     configurationPropsToSkipCompare
         .add("dfs.datanode.synconclose");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[9/9] hadoop git commit: Merge branch 'trunk' into HADOOP-12756

Posted by dr...@apache.org.
Merge branch 'trunk' into HADOOP-12756


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a49b3be3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a49b3be3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a49b3be3

Branch: refs/heads/HADOOP-12756
Commit: a49b3be38ed97a27f215afb996c6db516f5857d7
Parents: 22af6f8 e52d6e7
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Sep 21 14:03:21 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Sep 21 14:03:21 2016 +0800

----------------------------------------------------------------------
 .../hdfs/client/HdfsClientConfigKeys.java       |   3 -
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 -
 .../hdfs/server/namenode/INodeDirectory.java    |   9 +-
 .../hadoop/tools/TestHdfsConfigFields.java      |   2 -
 .../apache/hadoop/tools/DistCpConstants.java    |   4 +
 .../apache/hadoop/tools/SimpleCopyListing.java  | 114 ++-
 .../apache/hadoop/tools/TestCopyListing.java    |  83 ++-
 .../scheduler/AppSchedulingInfo.java            |  96 +--
 .../scheduler/SchedulerApplicationAttempt.java  | 744 +++++++++++--------
 .../allocator/RegularContainerAllocator.java    |   2 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 418 ++++++-----
 .../scheduler/fair/FSAppAttempt.java            | 465 ++++++------
 .../scheduler/TestAppSchedulingInfo.java        |  65 ++
 .../src/site/markdown/TimelineServiceV2.md      |   6 +
 14 files changed, 1240 insertions(+), 775 deletions(-)
----------------------------------------------------------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org