You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2016/01/18 02:04:19 UTC

hama git commit: Added round robin allocation strategy and its test case

Repository: hama
Updated Branches:
  refs/heads/master c709c0af2 -> 5aed6e39f


Added round robin allocation strategy and its test case

+ Changed the strategy used in JobInProgress class
+ Added RoundRobin Allocation strategy
+ Added test case to verify it

Fixed bug to handle the case when no tasks is allocation on any Groom

+ Added comments on code
+ Fixed code in round robin allocator
+ Fixed an issue in test

Changing the default class to best effort

Removing spaces

Removed getGroomToSchedule function


Project: http://git-wip-us.apache.org/repos/asf/hama/repo
Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/5aed6e39
Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/5aed6e39
Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/5aed6e39

Branch: refs/heads/master
Commit: 5aed6e39f4ec2f8996b418bfd375763f0d06a68c
Parents: c709c0a
Author: Behroz Sikander <be...@gmail.com>
Authored: Thu Jan 14 17:56:15 2016 +0100
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Mon Jan 18 10:02:40 2016 +0900

----------------------------------------------------------------------
 .../taskallocation/RoundRobinTaskAllocator.java | 146 +++++++++++++++++++
 .../hama/bsp/TestTaskAllocationRoundRobin.java  | 128 ++++++++++++++++
 2 files changed, 274 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hama/blob/5aed6e39/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
new file mode 100644
index 0000000..0bf060b
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+
+/**
+ * <code>RoundRobinTaskAllocator</code> is a round robin based task allocator that equally
+ * divides the tasks among all the Grooms. It balances the load of cluster. For example
+ * if a cluster has 10 Grooms and 20 tasks are to be scheduled then each Groom which
+ * get 2 tasks.
+ */
+public class RoundRobinTaskAllocator implements TaskAllocationStrategy {
+
+  Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  /**
+   * This function loops through the whole list of Grooms with their task count
+   * and returns the first Groom which contains the minimum number of tasks.
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @return returns the groom name which should be allocated the next task or 
+   *          null no suitable groom was found.
+   */
+  private String findGroomWithMinimumTasks(
+      Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap) {
+    
+    Entry<GroomServerStatus, Integer> firstGroomWithMinimumTasks = null;
+    
+    // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms
+    if (taskCountInGroomMap.size() < groomStatuses.size()) {
+      for (String s : groomStatuses.keySet()) {
+        GroomServerStatus groom = groomStatuses.get(s);
+        if (groom == null)
+          continue;
+        Integer taskInGroom = taskCountInGroomMap.get(groom);
+        
+        // Find the groom that is yet to get its first tasks and assign 0 value to it.
+        // Having zero will make sure that it gets selected.
+        if (taskInGroom == null) {
+          taskCountInGroomMap.put(groom, 0);
+          break;
+        }
+      }
+    }
+    
+    for (Entry<GroomServerStatus, Integer> currentGroom : taskCountInGroomMap.entrySet()) {
+      if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) {
+        if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks 
+          firstGroomWithMinimumTasks = currentGroom;
+        } // If there is no space then continue and find the next best groom
+      }
+    }
+    
+    return (firstGroomWithMinimumTasks == null) ? null
+        : firstGroomWithMinimumTasks.getKey().getGroomHostName();
+  }
+  
+  /**
+   * Select grooms that has the block of data locally stored on the groom
+   * server.
+   */
+  @Override
+  public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot start task based on id");
+      }
+      return new String[0];
+    }
+
+    RawSplit rawSplit = taskInProgress.getFileSplit();
+    if (rawSplit != null) {
+      return rawSplit.getLocations();
+    }
+    return null;
+  }
+
+  @Override
+  public GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exceeded allowed attempts.");
+      }
+      return null;
+    }    
+
+    String groomName = null;
+
+    groomName = findGroomWithMinimumTasks(groomStatuses, taskCountInGroomMap);
+
+    if (groomName != null) {
+      return groomStatuses.get(groomName);
+    }
+    
+    return null;
+  }
+
+  /**
+   * This operation is not supported.
+   */
+  @Override
+  public Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    throw new UnsupportedOperationException(
+        "This API is not supported for the called API function call.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hama/blob/5aed6e39/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
new file mode 100644
index 0000000..e471fe0
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocationRoundRobin extends TestCase {
+
+  public static final Log LOG = LogFactory
+      .getLog(TestTaskAllocationRoundRobin.class);
+
+  Configuration conf = new Configuration();
+  Map<String, GroomServerStatus> groomStatuses;
+  Map<GroomServerStatus, Integer> taskCountInGroomMap;
+  BSPResource[] resources;
+  TaskInProgress taskInProgress;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    String[] locations = new String[] { "host6", "host4", "host3" };
+    String value = "data";
+    RawSplit split = new RawSplit();
+    split.setLocations(locations);
+    split.setBytes(value.getBytes(), 0, value.getBytes().length);
+    split.setDataLength(value.getBytes().length);
+
+    assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+    taskCountInGroomMap = new LinkedHashMap<GroomServerStatus, Integer>(10);
+    resources = new BSPResource[0];
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+    taskInProgress = new TaskInProgress(job.getJobID(), "job.xml", split, conf,
+        jobProgress, 1);
+
+    groomStatuses = new LinkedHashMap<String, GroomServerStatus>(10);
+
+    for (int i = 0; i < 10; ++i) {
+      String name = "host" + i;
+
+      GroomServerStatus status = new GroomServerStatus(name,
+          new ArrayList<TaskStatus>(), 0, 3, "", name);
+      groomStatuses.put(name, status);
+      taskCountInGroomMap.put(status, 0);
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /**
+   * This test simulates the allocation of 30 tasks in round robin fashion
+   * on 10 Grooms.
+   */
+  @Test
+  public void testRoundRobinAllocation() {
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", RoundRobinTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    for (int i = 0; i < 30; i++) {
+      GroomServerStatus groomStatus = strategy.getGroomToAllocate(
+          groomStatuses, null, taskCountInGroomMap, resources, taskInProgress);
+      if (groomStatus != null) {
+        taskCountInGroomMap.put(groomStatus,
+            taskCountInGroomMap.get(groomStatus) + 1); // Increment the total tasks in it
+        
+        assertEquals("", "host" + (i % 10), groomStatus.getGroomHostName()); 
+      }
+    }
+  }
+
+  @Test
+  public void testRoundRobinDataLocality() throws Exception {
+
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", RoundRobinTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+        resources, taskInProgress);
+
+    List<String> list = new ArrayList<String>();
+
+    for (int i = 0; i < hosts.length; ++i) {
+      list.add(hosts[i]);
+    }
+
+    assertTrue(list.contains("host6"));
+    assertTrue(list.contains("host3"));
+    assertTrue(list.contains("host4"));
+  }
+
+}