You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/31 23:01:39 UTC

[1/3] hadoop git commit: YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. (wangda)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8.2 fa632929f -> 8e0e71075


YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. (wangda)

Change-Id: Ie8c5145f449582253dcf8c0f6f388e3660ab1c6b
(cherry picked from commit f2f09e2bb09d3ffbce7a0af32fc225d47dcab5d1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d5f2c40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d5f2c40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d5f2c40

Branch: refs/heads/branch-2.8.2
Commit: 4d5f2c402b674784ea11fd8f75c1bbf2a9ff6997
Parents: fa63292
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Aug 1 15:24:19 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 31 16:00:21 2017 -0700

----------------------------------------------------------------------
 .../capacity/FifoCandidatesSelector.java        |  6 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  7 +-
 .../CapacitySchedulerConfiguration.java         | 27 ++++++
 .../CapacitySchedulerPreemptionTestBase.java    |  7 +-
 ...TestCapacitySchedulerSurgicalPreemption.java | 98 ++++++++++++++++++++
 5 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d5f2c40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 33e4afc..6e0d7d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -44,12 +44,12 @@ public class FifoCandidatesSelector
       LogFactory.getLog(FifoCandidatesSelector.class);
   private PreemptableResourceCalculator preemptableAmountCalculator;
 
-  FifoCandidatesSelector(
-      CapacitySchedulerPreemptionContext preemptionContext) {
+  FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
+      boolean includeReservedResource) {
     super(preemptionContext);
 
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext, false);
+        preemptionContext, includeReservedResource);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d5f2c40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 8f59078..ee6a27d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -222,8 +222,13 @@ public class ProportionalCapacityPreemptionPolicy
           .add(new ReservedContainerCandidatesSelector(this));
     }
 
+    boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
+        CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
+
     // initialize candidates preemption selection policies
-    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
+    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
+        additionalPreemptionBasedOnReservedResource));
 
     // Do we need to specially consider intra queue
     boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d5f2c40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 38b538c..dfc1d24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1103,6 +1103,33 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
       0.2f;
 
   /**
+   * By default, reserved resource will be excluded while balancing capacities
+   * of queues.
+   *
+   * Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
+   * Support. To reduce unnecessary preemption for large containers. We will
+   * not include reserved resources while calculating ideal-allocation in
+   * FifoCandidatesSelector.
+   *
+   * Changes in YARN-4390 will significantly reduce number of containers preempted
+   * When cluster has heterogeneous container requests. (Please check test
+   * report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
+   *
+   * However, on the other hand, in some corner cases, especially for
+   * fragmented cluster. It could lead to preemption cannot kick in in some
+   * cases. Please see YARN-5731.
+   *
+   * So to solve the problem, make this change to be configurable, and please
+   * note that it is an experimental option.
+   */
+  public static final String
+      ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS =
+      PREEMPTION_CONFIG_PREFIX
+          + "additional_res_balance_based_on_reserved_containers";
+  public static final boolean
+      DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false;
+
+  /**
    * When calculating which containers to be preempted, we will try to preempt
    * containers for reserved containers first. By default is false.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d5f2c40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
index bd9f615..d2bf7f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -129,9 +129,10 @@ public class CapacitySchedulerPreemptionTestBase {
   public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
       ApplicationAttemptId appId, int expected) throws InterruptedException {
     int waitNum = 0;
+    int total = 0;
 
     while (waitNum < 500) {
-      int total = 0;
+      total = 0;
       for (RMContainer c : node.getCopiedListOfRunningContainers()) {
         if (c.getApplicationAttemptId().equals(appId)) {
           total++;
@@ -144,6 +145,8 @@ public class CapacitySchedulerPreemptionTestBase {
       waitNum++;
     }
 
-    Assert.fail();
+    Assert.fail(
+        "Check #live-container-on-node-from-app, actual=" + total + " expected="
+            + expected);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d5f2c40/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 5067412..908e0e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -38,6 +39,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 public class TestCapacitySchedulerSurgicalPreemption
     extends CapacitySchedulerPreemptionTestBase {
@@ -243,4 +245,100 @@ public class TestCapacitySchedulerSurgicalPreemption
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testPreemptionForFragmentatedCluster() throws Exception {
+    // Set additional_balance_queue_based_on_reserved_res to true to get
+    // additional preemptions.
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
+        true);
+
+    /**
+     * Two queues, a/b, each of them are 50/50
+     * 5 nodes in the cluster, each of them is 30G.
+     *
+     * Submit first app, AM = 3G, and 4 * 21G containers.
+     * Submit second app, AM = 3G, and 4 * 21G containers,
+     *
+     * We can get one container preempted from 1st app.
+     */
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
+        this.conf);
+    conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        1024 * 21);
+    conf.setQueues("root", new String[] { "a", "b" });
+    conf.setCapacity("root.a", 50);
+    conf.setUserLimitFactor("root.a", 100);
+    conf.setCapacity("root.b", 50);
+    conf.setUserLimitFactor("root.b", 100);
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
+
+    am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App1 should have 5 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
+
+    am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App2 should have 2 containers now
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+    // Call editSchedule twice and allocation once, container should get allocated
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    int tick = 0;
+    while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
+      // Do allocation for all nodes
+      for (int i = 0; i < 10; i++) {
+        MockNM mockNM = nms.get(i % nms.size());
+        RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+        cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+      }
+      tick++;
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: MAPREDUCE-6937. Backport MAPREDUCE-6870 to branch-2 while preserving compatibility. (Peter Bacsko via Haibo Chen)

Posted by ju...@apache.org.
MAPREDUCE-6937. Backport MAPREDUCE-6870 to branch-2 while preserving compatibility. (Peter Bacsko via Haibo Chen)

(cherry picked from commit 2fce0fa8d7c777a5374ee79e1a8e968b7319522a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c53d351f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c53d351f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c53d351f

Branch: refs/heads/branch-2.8.2
Commit: c53d351f3443c37a7142cb87f09b725fac97aadb
Parents: 4d5f2c4
Author: Haibo Chen <ha...@apache.org>
Authored: Thu Aug 31 09:21:09 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 31 16:00:41 2017 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  35 ++++-
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++++++++++++++----
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   6 +-
 .../src/main/resources/mapred-default.xml       |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c53d351f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 7071e14..2b2fbd0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -643,6 +643,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token<JobTokenIdentifier> jobToken;
@@ -716,6 +718,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     this.maxFetchFailuresNotifications = conf.getInt(
         MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
         MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+    this.finishJobWhenReducersDone = conf.getBoolean(
+        MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+        MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -2018,7 +2023,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
                 TimeUnit.MILLISECONDS);
         return JobStateInternal.FAIL_WAIT;
       }
-      
+
+      checkReadyForCompletionWhenAllReducersDone(job);
+
       return job.checkReadyForCommit();
     }
 
@@ -2049,6 +2056,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       }
       job.metrics.killedTask(task);
     }
+
+   /** Improvement: if all reducers have finished, we check if we have
+       restarted mappers that are still running. This can happen in a
+       situation when a node becomes UNHEALTHY and mappers are rescheduled.
+       See MAPREDUCE-6870 for details */
+    private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+      if (job.finishJobWhenReducersDone) {
+        int totalReduces = job.getTotalReduces();
+        int completedReduces = job.getCompletedReduces();
+
+        if (totalReduces > 0 && totalReduces == completedReduces
+            && !job.completingJob) {
+
+          for (TaskId mapTaskId : job.mapTasks) {
+            MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+            if (!task.isFinished()) {
+              LOG.info("Killing map task " + task.getID());
+              job.eventHandler.handle(
+                  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+            }
+          }
+
+          job.completingJob = true;
+        }
+      }
+    }
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c53d351f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 6deae2a..5369284 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -564,33 +564,13 @@ public class TestJobImpl {
     dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
 
     // replace the tasks with spied versions to return the right attempts
-    Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
-    List<NodeReport> nodeReports = new ArrayList<NodeReport>();
-    Map<NodeReport,TaskId> nodeReportsToTaskIds =
-        new HashMap<NodeReport,TaskId>();
-    for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
-      TaskId taskId = e.getKey();
-      Task task = e.getValue();
-      if (taskId.getTaskType() == TaskType.MAP) {
-        // add an attempt to the task to simulate nodes
-        NodeId nodeId = mock(NodeId.class);
-        TaskAttempt attempt = mock(TaskAttempt.class);
-        when(attempt.getNodeId()).thenReturn(nodeId);
-        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-        when(attempt.getID()).thenReturn(attemptId);
-        // create a spied task
-        Task spied = spy(task);
-        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
-        spiedTasks.put(taskId, spied);
+    Map<TaskId, Task> spiedTasks = new HashMap<>();
+    List<NodeReport> nodeReports = new ArrayList<>();
+    Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+    createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+        NodeState.UNHEALTHY, nodeReports);
 
-        // create a NodeReport based on the node id
-        NodeReport report = mock(NodeReport.class);
-        when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
-        when(report.getNodeId()).thenReturn(nodeId);
-        nodeReports.add(report);
-        nodeReportsToTaskIds.put(report, taskId);
-      }
-    }
     // replace the tasks with the spied tasks
     job.tasks.putAll(spiedTasks);
 
@@ -641,6 +621,82 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  @Test
+  public void testJobNCompletedWhenAllReducersAreFinished()
+      throws Exception {
+    testJobCompletionWhenReducersAreFinished(true);
+  }
+
+  @Test
+  public void testJobNotCompletedWhenAllReducersAreFinished()
+      throws Exception {
+    testJobCompletionWhenReducersAreFinished(false);
+  }
+
+  private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
+      throws InterruptedException, BrokenBarrierException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    final List<TaskEvent> killedEvents =
+        Collections.synchronizedList(new ArrayList<TaskEvent>());
+    dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
+      @Override
+      public void handle(TaskEvent event) {
+        if (event.getType() == TaskEventType.T_KILL) {
+          killedEvents.add(event);
+        }
+      }
+    });
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
+
+    // replace the tasks with spied versions to return the right attempts
+    Map<TaskId, Task> spiedTasks = new HashMap<>();
+    List<NodeReport> nodeReports = new ArrayList<>();
+    Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+    createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+        NodeState.RUNNING, nodeReports);
+
+    // replace the tasks with the spied tasks
+    job.tasks.putAll(spiedTasks);
+
+    // finish reducer
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.REDUCE) {
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+      }
+    }
+
+    dispatcher.await();
+
+    /*
+     * StubbedJob cannot finish in this test - we'd have to generate the
+     * necessary events in this test manually, but that wouldn't add too
+     * much value. Instead, we validate the T_KILL events.
+     */
+    if (killMappers) {
+      Assert.assertEquals("Number of killed events", 2, killedEvents.size());
+      Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
+          killedEvents.get(0).getTaskID().toString());
+      Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
+          killedEvents.get(1).getTaskID().toString());
+    } else {
+      Assert.assertEquals("Number of killed events", 0, killedEvents.size());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
     t.testJobNoTasks();
@@ -1021,6 +1077,37 @@ public class TestJobImpl {
     Assert.assertEquals(state, job.getInternalState());
   }
 
+  private void createSpiedMapTasks(Map<NodeReport, TaskId>
+      nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
+      NodeState nodeState, List<NodeReport> nodeReports) {
+    for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
+      TaskId taskId = e.getKey();
+      Task task = e.getValue();
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // add an attempt to the task to simulate nodes
+        NodeId nodeId = mock(NodeId.class);
+        TaskAttempt attempt = mock(TaskAttempt.class);
+        when(attempt.getNodeId()).thenReturn(nodeId);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        when(attempt.getID()).thenReturn(attemptId);
+        // create a spied task
+        Task spied = spy(task);
+        Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
+        attemptMap.put(attemptId, attempt);
+        when(spied.getAttempts()).thenReturn(attemptMap);
+        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
+        spiedTasks.put(taskId, spied);
+
+        // create a NodeReport based on the node id
+        NodeReport report = mock(NodeReport.class);
+        when(report.getNodeState()).thenReturn(nodeState);
+        when(report.getNodeId()).thenReturn(nodeId);
+        nodeReports.add(report);
+        nodeReportsToTaskIds.put(report, taskId);
+      }
+    }
+  }
+
   private static class JobSubmittedEventHandler implements
       EventHandler<JobHistoryEvent> {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c53d351f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index ae371c4..d15d6db 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -412,7 +412,7 @@ public interface MRJobConfig {
   public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
 
   public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
-  
+
   public static final String JOB_RUNNING_MAP_LIMIT =
       "mapreduce.job.running.map.limit";
   public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
@@ -945,4 +945,8 @@ public interface MRJobConfig {
    * A comma-separated list of properties whose value will be redacted.
    */
   String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
+
+  String FINISH_JOB_WHEN_REDUCERS_DONE =
+      "mapreduce.job.finish-when-all-reducers-done";
+  boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c53d351f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index e5cd660..a0a969f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1123,6 +1123,14 @@
 </property>
 
 <property>
+  <name>mapreduce.job.finish-when-all-reducers-done</name>
+  <value>false</value>
+  <description>Specifies whether the job should complete once all reducers
+     have finished, regardless of whether there are still running mappers.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.job.token.tracking.ids.enabled</name>
   <value>false</value>
   <description>Whether to write tracking ids of tokens to


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: HDFS-10326. Disable setting tcp socket send/receive buffers for write pipelines. Contributed by Daryn Sharp.

Posted by ju...@apache.org.
HDFS-10326. Disable setting tcp socket send/receive buffers for write pipelines. Contributed by Daryn Sharp.

(cherry picked from commit 6801b898d53cf603dea134fc8b492e16f65049fd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e0e7107
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e0e7107
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e0e7107

Branch: refs/heads/branch-2.8.2
Commit: 8e0e7107547f902028ff686079b517c5edb2c08e
Parents: c53d351
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Aug 8 14:58:11 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Aug 31 16:01:14 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/HdfsConstants.java     |  4 ++--
 .../src/main/resources/hdfs-default.xml         |  9 ++++++---
 .../hadoop/hdfs/TestDFSClientSocketSize.java    | 20 ++++++++++++--------
 3 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0e7107/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index a3af28a..7e52240 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -48,8 +48,8 @@ public class HdfsConstants {
   public static final byte COLD_STORAGE_POLICY_ID = 2;
   public static final String COLD_STORAGE_POLICY_NAME = "COLD";
 
-  // TODO should be conf injected?
-  public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
+  public static final int DEFAULT_DATA_SOCKET_SIZE = 0;
+
   /**
    * A special path component contained in the path for a snapshot file/dir
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0e7107/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index a7d4ef5..97d8717 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2437,13 +2437,14 @@
 
 <property>
   <name>dfs.client.socket.send.buffer.size</name>
-  <value>131072</value>
+  <value>0</value>
   <description>
     Socket send buffer size for a write pipeline in DFSClient side.
     This may affect TCP connection throughput.
     If it is set to zero or negative value,
     no buffer size will be set explicitly,
     thus enable tcp auto-tuning on some system.
+    The default value is 0.
   </description>
 </property>
 
@@ -2848,23 +2849,25 @@
 
 <property>
   <name>dfs.datanode.transfer.socket.send.buffer.size</name>
-  <value>131072</value>
+  <value>0</value>
   <description>
     Socket send buffer size for DataXceiver (mirroring packets to downstream
     in pipeline). This may affect TCP connection throughput.
     If it is set to zero or negative value, no buffer size will be set
     explicitly, thus enable tcp auto-tuning on some system.
+    The default value is 0.
   </description>
 </property>
 
 <property>
   <name>dfs.datanode.transfer.socket.recv.buffer.size</name>
-  <value>131072</value>
+  <value>0</value>
   <description>
     Socket receive buffer size for DataXceiver (receiving packets from client
     during block writing). This may affect TCP connection throughput.
     If it is set to zero or negative value, no buffer size will be set
     explicitly, thus enable tcp auto-tuning on some system.
+    The default value is 0.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e0e7107/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java
index a2a7afc..4b286f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientSocketSize.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.Socket;
 
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
 import static org.junit.Assert.assertTrue;
 
@@ -42,15 +41,16 @@ public class TestDFSClientSocketSize {
   }
 
   /**
-   * The setting of socket send buffer size in
-   * {@link java.net.Socket#setSendBufferSize(int)} is only a hint.  Actual
-   * value may differ.  We just sanity check that it is somewhere close.
+   * Test that the send buffer size default value is 0, in which case the socket
+   * will use a TCP auto-tuned value.
    */
   @Test
   public void testDefaultSendBufferSize() throws IOException {
-    assertTrue("Send buffer size should be somewhere near default.",
-        getSendBufferSize(new Configuration()) >=
-            DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT / 2);
+    final int sendBufferSize = getSendBufferSize(new Configuration());
+    LOG.info("If not specified, the auto tuned send buffer size is: {}",
+        sendBufferSize);
+    assertTrue("Send buffer size should be non-negative value which is " +
+        "determined by system (kernel).", sendBufferSize > 0);
   }
 
   /**
@@ -73,6 +73,10 @@ public class TestDFSClientSocketSize {
         sendBufferSize1 > sendBufferSize2);
   }
 
+  /**
+   * Test that if the send buffer size is 0, the socket will use a TCP
+   * auto-tuned value.
+   */
   @Test
   public void testAutoTuningSendBufferSize() throws IOException {
     final Configuration conf = new Configuration();
@@ -80,7 +84,7 @@ public class TestDFSClientSocketSize {
     final int sendBufferSize = getSendBufferSize(conf);
     LOG.info("The auto tuned send buffer size is: {}", sendBufferSize);
     assertTrue("Send buffer size should be non-negative value which is " +
-          "determined by system (kernel).", sendBufferSize > 0);
+        "determined by system (kernel).", sendBufferSize > 0);
   }
 
   private int getSendBufferSize(Configuration conf) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org