You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/10/16 09:47:12 UTC

svn commit: r825789 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/examples/org/apache/hadoop/examples/pi/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapred/

Author: sharad
Date: Fri Oct 16 07:47:11 2009
New Revision: 825789

URL: http://svn.apache.org/viewvc?rev=825789&view=rev
Log:
MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of tasks. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Oct 16 07:47:11 2009
@@ -747,3 +747,6 @@
     MAPREDUCE-769. Make findbugs and javac warnings to zero.
     (Amareshwari Sriramadasu via sharad)
 
+    MAPREDUCE-1117. Fix ClusterMetrics to return info about slots instead of 
+    tasks. (Amareshwari Sriramadasu via sharad)
+

Modified: hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/examples/org/apache/hadoop/examples/pi/DistSum.java Fri Oct 16 07:47:11 2009
@@ -38,9 +38,9 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -379,14 +379,14 @@
   public static class MixMachine extends Machine {
     private static final MixMachine INSTANCE = new MixMachine();
     
-    private JobClient jobclient;
+    private Cluster cluster;
 
     /** {@inheritDoc} */
     @Override
     public synchronized void init(Job job) throws IOException {
       final Configuration conf = job.getConfiguration();
-      if (jobclient == null)
-        jobclient = new JobClient(JobTracker.getAddress(conf), conf);
+      if (cluster == null)
+        cluster = new Cluster(JobTracker.getAddress(conf), conf);
       chooseMachine(conf).init(job);
     }
 
@@ -398,9 +398,11 @@
       try {
         for(;; Thread.sleep(2000)) {
           //get cluster status
-          final ClusterStatus status = jobclient.getClusterStatus();
-          final int m = status.getMaxMapTasks() - status.getMapTasks();
-          final int r = status.getMaxReduceTasks() - status.getReduceTasks();
+          final ClusterMetrics status = cluster.getClusterStatus();
+          final int m = 
+            status.getMapSlotCapacity() - status.getOccupiedMapSlots();
+          final int r = 
+            status.getReduceSlotCapacity() - status.getOccupiedReduceSlots();
           if (m >= parts || r >= parts) {
             //favor ReduceSide machine
             final Machine value = r >= parts?

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Oct 16 07:47:11 2009
@@ -20,7 +20,6 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
@@ -1206,6 +1205,8 @@
   //
   int totalMaps = 0;
   int totalReduces = 0;
+  private int occupiedMapSlots = 0;
+  private int occupiedReduceSlots = 0;
   private HashMap<String, TaskTracker> taskTrackers =
     new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
@@ -2400,6 +2401,8 @@
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
+      occupiedMapSlots  -= oldStatus.countOccupiedMapSlots();
+      occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
         int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
@@ -2422,6 +2425,8 @@
     if (status != null) {
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
+      occupiedMapSlots  += status.countOccupiedMapSlots();
+      occupiedReduceSlots += status.countOccupiedReduceSlots();
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
         int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
@@ -2927,8 +2932,8 @@
   }
   
   public synchronized ClusterMetrics getClusterMetrics() {
-    return new ClusterMetrics(totalMaps, totalReduces, totalMapTaskCapacity,
-      totalReduceTaskCapacity, taskTrackers.size() - 
+    return new ClusterMetrics(occupiedMapSlots, occupiedReduceSlots,
+      totalMapTaskCapacity, totalReduceTaskCapacity, taskTrackers.size() - 
       getBlacklistedTrackerCount(), 
       getBlacklistedTrackerCount(), getExcludedNodes().size()) ;
   }

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java?rev=825789&r1=825788&r2=825789&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/ClusterMetrics.java Fri Oct 16 07:47:11 2009
@@ -35,10 +35,10 @@
  *   Number of blacklisted and decommissioned trackers.  
  *   </li>
  *   <li>
- *   Task capacity of the cluster. 
+ *   Slot capacity of the cluster. 
  *   </li>
  *   <li>
- *   The number of currently running map & reduce tasks.
+ *   The number of currently occupied map & reduce slots.
  *   </li>
  * </ol></p>
  * 
@@ -48,24 +48,25 @@
  * @see Cluster
  */
 public class ClusterMetrics implements Writable {
-  int runningMaps;
-  int runningReduces;
-  int mapSlots;
-  int reduceSlots;
-  int numTrackers;
-  int numBlacklistedTrackers;
-  int numDecommissionedTrackers;
+  private int occupiedMapSlots;
+  private int occupiedReduceSlots;
+  private int totalMapSlots;
+  private int totalReduceSlots;
+  private int numTrackers;
+  private int numBlacklistedTrackers;
+  private int numDecommissionedTrackers;
 
   public ClusterMetrics() {
   }
   
-  public ClusterMetrics(int runningMaps, int runningReduces, int mapSlots, 
-    int reduceSlots, int numTrackers, int numBlacklistedTrackers,
-    int numDecommisionedNodes) {
-    this.runningMaps = runningMaps;
-    this.runningReduces = runningReduces;
-    this.mapSlots = mapSlots;
-    this.reduceSlots = reduceSlots;
+  public ClusterMetrics(int occupiedMapSlots, int occupiedReduceSlots,
+      int mapSlots, int reduceSlots, 
+      int numTrackers, int numBlacklistedTrackers,
+      int numDecommisionedNodes) {
+    this.occupiedMapSlots = occupiedMapSlots;
+    this.occupiedReduceSlots = occupiedReduceSlots;
+    this.totalMapSlots = mapSlots;
+    this.totalReduceSlots = reduceSlots;
     this.numTrackers = numTrackers;
     this.numBlacklistedTrackers = numBlacklistedTrackers;
     this.numDecommissionedTrackers = numDecommisionedNodes;
@@ -77,7 +78,7 @@
    * @return occupied map slot count
    */
   public int getOccupiedMapSlots() { 
-    return runningMaps;
+    return occupiedMapSlots;
   }
   
   /**
@@ -86,7 +87,7 @@
    * @return occupied reduce slot count
    */
   public int getOccupiedReduceSlots() { 
-    return runningReduces; 
+    return occupiedReduceSlots; 
   }
   
   /**
@@ -95,7 +96,7 @@
    * @return map slot capacity
    */
   public int getMapSlotCapacity() {
-    return mapSlots;
+    return totalMapSlots;
   }
   
   /**
@@ -104,7 +105,7 @@
    * @return reduce slot capacity
    */
   public int getReduceSlotCapacity() {
-    return reduceSlots;
+    return totalReduceSlots;
   }
   
   /**
@@ -136,10 +137,10 @@
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    runningMaps = in.readInt();
-    runningReduces = in.readInt();
-    mapSlots = in.readInt();
-    reduceSlots = in.readInt();
+    occupiedMapSlots = in.readInt();
+    occupiedReduceSlots = in.readInt();
+    totalMapSlots = in.readInt();
+    totalReduceSlots = in.readInt();
     numTrackers = in.readInt();
     numBlacklistedTrackers = in.readInt();
     numDecommissionedTrackers = in.readInt();
@@ -147,10 +148,10 @@
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(runningMaps);
-    out.writeInt(runningReduces);
-    out.writeInt(mapSlots);
-    out.writeInt(reduceSlots);
+    out.writeInt(occupiedMapSlots);
+    out.writeInt(occupiedReduceSlots);
+    out.writeInt(totalMapSlots);
+    out.writeInt(totalReduceSlots);
     out.writeInt(numTrackers);
     out.writeInt(numBlacklistedTrackers);
     out.writeInt(numDecommissionedTrackers);

Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java?rev=825789&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestClusterStatus.java Fri Oct 16 07:47:11 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestClusterStatus extends TestCase {
+
+  private static String[] trackers = new String[] { "tracker_tracker1:1000",
+      "tracker_tracker2:1000", "tracker_tracker3:1000" };
+  private static JobTracker jobTracker;
+  private static int mapSlotsPerTracker = 4;
+  private static int reduceSlotsPerTracker = 2;
+  private static MiniMRCluster mr;
+  private static Cluster cluster;
+  // heartbeat responseId. increment this after sending a heartbeat
+  private static short responseId = 1;
+  
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestClusterStatus.class)) {
+      protected void setUp() throws Exception {
+      
+        mr = new MiniMRCluster(0, "file:///", 1);
+        jobTracker = mr.getJobTrackerRunner().getJobTracker();
+        for (String tracker : trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
+        }
+        cluster = new Cluster(mr.createJobConf());
+      }
+
+      protected void tearDown() throws Exception {
+        cluster.close();
+        mr.shutdown();
+      }
+    };
+    return setup;
+  }
+  
+  private TaskTrackerStatus getTTStatus(String trackerName,
+      List<TaskStatus> taskStatuses) {
+    return new TaskTrackerStatus(trackerName, 
+      JobInProgress.convertTrackerNameToHostName(trackerName), 0,
+      taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
+  }
+  
+  public void testClusterMetrics() throws IOException, InterruptedException {
+    assertEquals("tasktracker count doesn't match", trackers.length,
+      cluster.getClusterStatus().getTaskTrackerCount());
+    
+    List<TaskStatus> list = new ArrayList<TaskStatus>();
+
+    // create a map task status, which uses 2 slots. 
+    int mapSlotsPerTask = 2;
+    TaskStatus ts = TaskStatus.createTaskStatus(true, 
+      new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0), 0.0f, mapSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.MAP, null);
+    list.add(ts);
+    
+    // create a reduce task status, which uses 1 slot.
+    int reduceSlotsPerTask = 1;
+    ts = TaskStatus.createTaskStatus(false, 
+      new TaskAttemptID("jt", 1, TaskType.REDUCE, 0, 0), 0.0f,
+      reduceSlotsPerTask,
+      TaskStatus.State.RUNNING, "", "", trackers[0], 
+      TaskStatus.Phase.REDUCE, null);
+    list.add(ts);
+    
+    // create TaskTrackerStatus and send heartbeats
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], list);
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false, 
+        trackers[i], responseId);
+    }
+    responseId++;
+    // assert ClusterMetrics
+    ClusterMetrics metrics = cluster.getClusterStatus();
+    assertEquals("occupied map slots do not match", mapSlotsPerTask,
+      metrics.getOccupiedMapSlots());
+    assertEquals("occupied reduce slots do not match", reduceSlotsPerTask,
+      metrics.getOccupiedReduceSlots());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      metrics.getMapSlotCapacity());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      metrics.getReduceSlotCapacity());
+    
+    // assert the values in ClusterStatus also
+    assertEquals("running map tasks do not match", 1,
+      jobTracker.getClusterStatus().getMapTasks());
+    assertEquals("running reduce tasks do not match", 1,
+      jobTracker.getClusterStatus().getReduceTasks());
+    assertEquals("map slot capacities do not match",
+      mapSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxMapTasks());
+    assertEquals("reduce slot capacities do not match",
+      reduceSlotsPerTracker * trackers.length,
+      jobTracker.getClusterStatus().getMaxReduceTasks());
+    cluster.close();
+  }
+}