You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2016/01/18 02:04:19 UTC
hama git commit: Added round robin allocation strategy and its test
case
Repository: hama
Updated Branches:
refs/heads/master c709c0af2 -> 5aed6e39f
Added round robin allocation strategy and its test case
+ Changed the strategy used in JobInProgress class
+ Added RoundRobin Allocation strategy
+ Added test case to verify it
Fixed bug to handle the case when no tasks is allocation on any Groom
+ Added comments on code
+ Fixed code in round robin allocator
+ Fixed an issue in test
Changing the default class to best effort
Removing spaces
Removed getGroomToSchedule function
Project: http://git-wip-us.apache.org/repos/asf/hama/repo
Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/5aed6e39
Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/5aed6e39
Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/5aed6e39
Branch: refs/heads/master
Commit: 5aed6e39f4ec2f8996b418bfd375763f0d06a68c
Parents: c709c0a
Author: Behroz Sikander <be...@gmail.com>
Authored: Thu Jan 14 17:56:15 2016 +0100
Committer: Edward J. Yoon <ed...@apache.org>
Committed: Mon Jan 18 10:02:40 2016 +0900
----------------------------------------------------------------------
.../taskallocation/RoundRobinTaskAllocator.java | 146 +++++++++++++++++++
.../hama/bsp/TestTaskAllocationRoundRobin.java | 128 ++++++++++++++++
2 files changed, 274 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hama/blob/5aed6e39/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
new file mode 100644
index 0000000..0bf060b
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+
+/**
+ * <code>RoundRobinTaskAllocator</code> is a round robin based task allocator that equally
+ * divides the tasks among all the Grooms. It balances the load of cluster. For example
+ * if a cluster has 10 Grooms and 20 tasks are to be scheduled then each Groom which
+ * get 2 tasks.
+ */
+public class RoundRobinTaskAllocator implements TaskAllocationStrategy {
+
+ Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class);
+
+ @Override
+ public void initialize(Configuration conf) {
+ }
+
+ /**
+ * This function loops through the whole list of Grooms with their task count
+ * and returns the first Groom which contains the minimum number of tasks.
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @return returns the groom name which should be allocated the next task or
+ * null no suitable groom was found.
+ */
+ private String findGroomWithMinimumTasks(
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap) {
+
+ Entry<GroomServerStatus, Integer> firstGroomWithMinimumTasks = null;
+
+ // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms
+ if (taskCountInGroomMap.size() < groomStatuses.size()) {
+ for (String s : groomStatuses.keySet()) {
+ GroomServerStatus groom = groomStatuses.get(s);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = taskCountInGroomMap.get(groom);
+
+ // Find the groom that is yet to get its first tasks and assign 0 value to it.
+ // Having zero will make sure that it gets selected.
+ if (taskInGroom == null) {
+ taskCountInGroomMap.put(groom, 0);
+ break;
+ }
+ }
+ }
+
+ for (Entry<GroomServerStatus, Integer> currentGroom : taskCountInGroomMap.entrySet()) {
+ if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) {
+ if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks
+ firstGroomWithMinimumTasks = currentGroom;
+ } // If there is no space then continue and find the next best groom
+ }
+ }
+
+ return (firstGroomWithMinimumTasks == null) ? null
+ : firstGroomWithMinimumTasks.getKey().getGroomHostName();
+ }
+
+ /**
+ * Select grooms that has the block of data locally stored on the groom
+ * server.
+ */
+ @Override
+ public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot start task based on id");
+ }
+ return new String[0];
+ }
+
+ RawSplit rawSplit = taskInProgress.getFileSplit();
+ if (rawSplit != null) {
+ return rawSplit.getLocations();
+ }
+ return null;
+ }
+
+ @Override
+ public GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exceeded allowed attempts.");
+ }
+ return null;
+ }
+
+ String groomName = null;
+
+ groomName = findGroomWithMinimumTasks(groomStatuses, taskCountInGroomMap);
+
+ if (groomName != null) {
+ return groomStatuses.get(groomName);
+ }
+
+ return null;
+ }
+
+ /**
+ * This operation is not supported.
+ */
+ @Override
+ public Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ throw new UnsupportedOperationException(
+ "This API is not supported for the called API function call.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hama/blob/5aed6e39/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
new file mode 100644
index 0000000..e471fe0
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocationRoundRobin extends TestCase {
+
+ public static final Log LOG = LogFactory
+ .getLog(TestTaskAllocationRoundRobin.class);
+
+ Configuration conf = new Configuration();
+ Map<String, GroomServerStatus> groomStatuses;
+ Map<GroomServerStatus, Integer> taskCountInGroomMap;
+ BSPResource[] resources;
+ TaskInProgress taskInProgress;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ String[] locations = new String[] { "host6", "host4", "host3" };
+ String value = "data";
+ RawSplit split = new RawSplit();
+ split.setLocations(locations);
+ split.setBytes(value.getBytes(), 0, value.getBytes().length);
+ split.setDataLength(value.getBytes().length);
+
+ assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+ taskCountInGroomMap = new LinkedHashMap<GroomServerStatus, Integer>(10);
+ resources = new BSPResource[0];
+ BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+ JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+ taskInProgress = new TaskInProgress(job.getJobID(), "job.xml", split, conf,
+ jobProgress, 1);
+
+ groomStatuses = new LinkedHashMap<String, GroomServerStatus>(10);
+
+ for (int i = 0; i < 10; ++i) {
+ String name = "host" + i;
+
+ GroomServerStatus status = new GroomServerStatus(name,
+ new ArrayList<TaskStatus>(), 0, 3, "", name);
+ groomStatuses.put(name, status);
+ taskCountInGroomMap.put(status, 0);
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /**
+ * This test simulates the allocation of 30 tasks in round robin fashion
+ * on 10 Grooms.
+ */
+ @Test
+ public void testRoundRobinAllocation() {
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", RoundRobinTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ for (int i = 0; i < 30; i++) {
+ GroomServerStatus groomStatus = strategy.getGroomToAllocate(
+ groomStatuses, null, taskCountInGroomMap, resources, taskInProgress);
+ if (groomStatus != null) {
+ taskCountInGroomMap.put(groomStatus,
+ taskCountInGroomMap.get(groomStatus) + 1); // Increment the total tasks in it
+
+ assertEquals("", "host" + (i % 10), groomStatus.getGroomHostName());
+ }
+ }
+ }
+
+ @Test
+ public void testRoundRobinDataLocality() throws Exception {
+
+ TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+ .getClass("", RoundRobinTaskAllocator.class,
+ TaskAllocationStrategy.class), conf);
+
+ String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+ resources, taskInProgress);
+
+ List<String> list = new ArrayList<String>();
+
+ for (int i = 0; i < hosts.length; ++i) {
+ list.add(hosts[i]);
+ }
+
+ assertTrue(list.contains("host6"));
+ assertTrue(list.contains("host3"));
+ assertTrue(list.contains("host4"));
+ }
+
+}