You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2012/08/05 13:07:49 UTC
svn commit: r1369551 [2/3] - in /hama/branches/HAMA-505-branch: conf/
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/ft/
core/src/main/java/org/apache/hama/bsp/message/ core/src/main/j...
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.RecoverTaskAction;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.TaskStatus;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>AsyncRcvdMsgCheckpointImpl</code> Checkpoint service defines the fault
+ * tolerance strategy by checkpointing of messages sent across peers. On
+ * failure, all the tasks are restarted from the last superstep for which all
+ * the peers successfully checkpointed the messages.
+ *
+ */
+public class AsyncRcvdMsgCheckpointImpl<M extends Writable> implements
+ BSPFaultTolerantService<M> {
+
+ private static final Log LOG = LogFactory
+ .getLog(AsyncRcvdMsgCheckpointImpl.class);
+
+ /**
+ * It is responsible to find the smallest superstep for which the
+ * checkpointing is done and then restart all the peers from that superstep.
+ */
+ private static class CheckpointMasterService implements
+ FaultTolerantMasterService {
+
+ private Configuration conf;
+ private TaskInProgress tasks[];
+ private BSPJobID jobId;
+ private int maxTaskAttempts;
+ private int currentAttemptId;
+ private MasterSyncClient masterSyncClient;
+ private TaskAllocationStrategy allocationStrategy;
+
+ /**
+ * Initializes the fault tolerance service at BSPMasters
+ *
+ * @param jobId The identifier of the job.
+ * @param maxTaskAttempts Number of attempts allowed for recovering from
+ * failure.
+ * @param tasks The list of tasks in the job.
+ * @param conf The job configuration object.
+ * @param masterClient The synchronization client used by BSPMaster.
+ * @param allocationStrategy The task allocation strategy of the job.
+ */
+ public void initialize(BSPJobID jobId, int maxTaskAttempts,
+ TaskInProgress[] tasks, Configuration conf,
+ MasterSyncClient masterClient, TaskAllocationStrategy allocationStrategy) {
+ this.tasks = tasks;
+ this.jobId = jobId;
+ this.conf = conf;
+ this.maxTaskAttempts = maxTaskAttempts;
+ this.currentAttemptId = 0;
+ this.masterSyncClient = masterClient;
+ this.allocationStrategy = allocationStrategy;
+ }
+
+ @Override
+ public boolean isRecoveryPossible(TaskInProgress tip) {
+ return currentAttemptId < maxTaskAttempts;
+ }
+
+ @Override
+ public boolean isAlreadyRecovered(TaskInProgress tip) {
+ return currentAttemptId < tip.getCurrentTaskAttemptId().getId();
+ }
+
+ @Override
+ public void recoverTasks(JobInProgress jip,
+ Map<String, GroomServerStatus> groomStatuses,
+ TaskInProgress[] failedTasksInProgress,
+ TaskInProgress[] allTasksInProgress,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+
+ Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
+ 2 * failedTasksInProgress.length);
+ for (int i = 0; i < failedTasksInProgress.length; ++i) {
+ recoverySet.put(failedTasksInProgress[i].getTaskId(),
+ failedTasksInProgress[i]);
+ }
+
+ long lowestSuperstepNumber = Long.MAX_VALUE;
+
+ String[] taskProgress = this.masterSyncClient.getChildKeySet(
+ this.masterSyncClient.constructKey(jobId, "checkpoint"), null);
+
+ if (LOG.isDebugEnabled()) {
+ StringBuffer list = new StringBuffer(25 * taskProgress.length);
+ list.append("got child key set").append(taskProgress.length)
+ .append("/").append(tasks.length).append(" ");
+ for (String entry : taskProgress) {
+ list.append(entry).append(",");
+ }
+ LOG.debug(list);
+ }
+
+ if (taskProgress.length == this.tasks.length) {
+ for (int i = 0; i < taskProgress.length; ++i) {
+ ArrayWritable progressInformation = new ArrayWritable(
+ LongWritable.class);
+ boolean result = this.masterSyncClient.getInformation(
+ this.masterSyncClient.constructKey(jobId, "checkpoint",
+ taskProgress[i]), progressInformation);
+
+ if (!result) {
+ lowestSuperstepNumber = -1L;
+ break;
+ }
+
+ Writable[] progressArr = progressInformation.get();
+ LongWritable superstepProgress = (LongWritable) progressArr[0];
+
+ if (superstepProgress != null) {
+ if (superstepProgress.get() < lowestSuperstepNumber) {
+ lowestSuperstepNumber = superstepProgress.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got superstep number " + lowestSuperstepNumber
+ + " from " + taskProgress[i]);
+ }
+ }
+ }
+ }
+ clearClientForSuperstep(lowestSuperstepNumber);
+ restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
+ allTasksInProgress, taskCountInGroomMap, actionMap);
+
+ } else {
+ restartJob(-1, groomStatuses, recoverySet, allTasksInProgress,
+ taskCountInGroomMap, actionMap);
+ }
+
+ ++currentAttemptId;
+ }
+
+ private void clearClientForSuperstep(long superstep) {
+ this.masterSyncClient.remove(
+ masterSyncClient.constructKey(jobId, "sync"), null);
+ }
+
+ private void populateAction(Task task, long superstep,
+ GroomServerStatus groomStatus,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+ List<GroomServerAction> list = actionMap.get(groomStatus);
+ if (!actionMap.containsKey(groomStatus)) {
+ list = new ArrayList<GroomServerAction>();
+ actionMap.put(groomStatus, list);
+ }
+ list.add(new RecoverTaskAction(task, superstep));
+
+ }
+
+ private void restartTask(TaskInProgress tip, long superstep,
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+ GroomServerStatus serverStatus = tip.getGroomServerStatus();
+ Task task = tip.constructTask(serverStatus);
+ populateAction(task, superstep, serverStatus, actionMap);
+
+ }
+
+ private void restartJob(long superstep,
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<TaskID, TaskInProgress> recoveryMap, TaskInProgress[] allTasks,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException {
+ String path = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+
+ if (superstep >= 0) {
+ FileSystem fileSystem = FileSystem.get(conf);
+ for (int i = 0; i < allTasks.length; ++i) {
+ String[] hosts = null;
+ if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+
+ // Update task count in map.
+ // TODO: This should be a responsibility of GroomServerStatus
+ Integer count = taskCountInGroomMap.get(allTasks[i]
+ .getGroomServerStatus());
+ if (count != null) {
+ count = count.intValue() - 1;
+ taskCountInGroomMap
+ .put(allTasks[i].getGroomServerStatus(), count);
+ }
+
+ StringBuffer ckptPath = new StringBuffer(path);
+ ckptPath.append(this.jobId.toString());
+ ckptPath.append("/").append(superstep).append("/")
+ .append(allTasks[i].getTaskId().getId());
+ Path checkpointPath = new Path(ckptPath.toString());
+ if (fileSystem.exists(checkpointPath)) {
+ FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+ BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+ fileStatus, 0, fileStatus.getLen());
+ hosts = blocks[0].getHosts();
+ } else {
+ hosts = new String[groomStatuses.keySet().size()];
+ groomStatuses.keySet().toArray(hosts);
+ }
+ GroomServerStatus serverStatus = this.allocationStrategy
+ .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+ new BSPResource[0], allTasks[i]);
+ Task task = allTasks[i].constructTask(serverStatus);
+ populateAction(task, superstep, serverStatus, actionMap);
+
+ } else {
+ restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ }
+ }
+ } else {
+ // Start the task from the beginning.
+ for (int i = 0; i < allTasks.length; ++i) {
+ if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+ this.allocationStrategy.getGroomToAllocate(groomStatuses,
+ this.allocationStrategy.selectGrooms(groomStatuses,
+ taskCountInGroomMap, new BSPResource[0], allTasks[i]),
+ taskCountInGroomMap, new BSPResource[0], allTasks[i]);
+ } else {
+ restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+ }
+ }
+ }
+ }
+
+ }// end of CheckpointMasterService
+
+ @Override
+ public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient,
+ InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+ long superstep, Configuration conf, MessageManager<M> messenger)
+ throws Exception {
+ CheckpointPeerService<M> service = new CheckpointPeerService<M>();
+ service.initialize(job, bspPeer, syncClient, peerAddress, taskAttemptId,
+ superstep, conf, messenger);
+ return service;
+ }
+
+ @Override
+ public FaultTolerantMasterService constructMasterFaultTolerance(
+ BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+ Configuration conf, MasterSyncClient masterClient,
+ TaskAllocationStrategy allocationStrategy) throws Exception {
+ CheckpointMasterService service = new CheckpointMasterService();
+ service.initialize(jobId, maxTaskAttempts, tasks, conf, masterClient,
+ allocationStrategy);
+ return service;
+ }
+
+ /**
+ * Initializes the peer fault tolerance by checkpointing service. For
+ * recovery, on peer initialization, it reads all the checkpointed messages to
+ * recover the state of the peer. During normal working, it checkpoints all
+ * the messages it received in the previous superstep. It also stores the
+ * superstep progress in the global synchronization area.
+ *
+ */
+ public static class CheckpointPeerService<M extends Writable> implements
+ FaultTolerantPeerService<M>, MessageEventListener<M> {
+
+ private BSPJob job;
+ @SuppressWarnings("rawtypes")
+ private BSPPeer peer;
+ private PeerSyncClient syncClient;
+ private long superstep;
+ private Configuration conf;
+ private MessageManager<M> messenger;
+ private FileSystem fs;
+ private int checkPointInterval;
+ volatile private long lastCheckPointStep;
+ volatile private boolean checkpointState;
+ volatile private FSDataOutputStream checkpointStream;
+ volatile private long checkpointMessageCount;
+
+ public void initialize(BSPJob job, @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient, InetSocketAddress peerAddress,
+ TaskAttemptID taskAttemptId, long superstep, Configuration conf,
+ MessageManager<M> messenger) throws IOException {
+
+ this.job = job;
+ this.peer = bspPeer;
+ this.syncClient = syncClient;
+ this.superstep = superstep;
+ this.conf = conf;
+ this.messenger = messenger;
+ this.fs = FileSystem.get(conf);
+ this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+ Constants.DEFAULT_CHECKPOINT_INTERVAL);
+ this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+ Constants.DEFAULT_CHECKPOINT_INTERVAL);
+
+ this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED,
+ false);
+
+ if (superstep > 0) {
+ this.lastCheckPointStep = this.superstep;
+ } else {
+ this.lastCheckPointStep = 1;
+ }
+ this.checkpointMessageCount = 0L;
+ }
+
+ private String checkpointPath(long step) {
+ String backup = conf.get("bsp.checkpoint.prefix_path", "checkpoint/");
+ String ckptPath = backup + job.getJobID().toString() + "/" + (step) + "/"
+ + peer.getPeerIndex();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Received Messages are to be saved to " + ckptPath);
+ return ckptPath;
+ }
+
+ @Override
+ public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+ throws Exception {
+ if (this.superstep >= 0 && state.equals(TaskStatus.State.RECOVERING)) {
+ ArrayWritable progressArr = new ArrayWritable(LongWritable.class);
+ boolean result = this.syncClient.getInformation(
+ this.syncClient.constructKey(job.getJobID(), "checkpoint",
+ String.valueOf(peer.getPeerIndex())), progressArr);
+
+ if (!result) {
+ throw new IOException("No data found to restore peer state.");
+ }
+
+ Writable[] progressInfo = progressArr.get();
+ long superstepProgress = ((LongWritable) progressInfo[0]).get();
+ long numMessages = ((LongWritable) progressInfo[1]).get();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got sstep =" + superstepProgress + " numMessages = "
+ + numMessages + " this.superstep = " + this.superstep);
+ }
+
+ if (numMessages > 0) {
+ Path path = new Path(checkpointPath(superstepProgress));
+ FSDataInputStream in = this.fs.open(path);
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ try {
+ for (int i = 0; i < numMessages; ++i) {
+ String className = in.readUTF();
+ @SuppressWarnings("unchecked")
+ M message = (M) ReflectionUtils.newInstance(
+ Class.forName(className), conf);
+ message.readFields(in);
+ bundle.addMessage(message);
+ }
+ messenger.loopBackMessages(bundle);
+ } catch (EOFException e) {
+ LOG.error("Error recovering from checkpointing", e);
+ throw new IOException(e);
+ } finally {
+ this.fs.close();
+ }
+ }
+ }
+ this.messenger.registerListener(this);
+ return TaskStatus.State.RUNNING;
+
+ }
+
+ public final boolean isReadyToCheckpoint() {
+
+ checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+ LOG.info(new StringBuffer(1000).append("Enabled = ")
+ .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+ .append(" checkPointInterval = ").append(checkPointInterval)
+ .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+ .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+ .toString());
+ if (LOG.isDebugEnabled())
+ LOG.debug(new StringBuffer(1000).append("Enabled = ")
+ .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+ .append(" checkPointInterval = ").append(checkPointInterval)
+ .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+ .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+ .toString());
+
+ return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+ && (checkPointInterval != 0) && (((int) ((peer.getSuperstepCount() + 1) - lastCheckPointStep)) >= checkPointInterval));
+
+ }
+
+ @Override
+ public void beforeBarrier() throws Exception {
+ }
+
+ @Override
+ public void duringBarrier() throws Exception {
+ }
+
+ @Override
+ public void afterBarrier() throws Exception {
+
+ synchronized (this) {
+ if (checkpointState) {
+
+ if (checkpointStream != null) {
+ this.checkpointStream.close();
+ this.checkpointStream = null;
+ }
+
+ lastCheckPointStep = peer.getSuperstepCount();
+
+ ArrayWritable writableArray = new ArrayWritable(LongWritable.class);
+ Writable[] writeArr = new Writable[2];
+ writeArr[0] = new LongWritable(lastCheckPointStep);
+ writeArr[1] = new LongWritable(checkpointMessageCount);
+ writableArray.set(writeArr);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing lastCheckPointStep = " + lastCheckPointStep
+ + " checkpointMessageCount = " + checkpointMessageCount
+ + " for peer = " + String.valueOf(peer.getPeerIndex()));
+ }
+
+ this.syncClient.storeInformation(this.syncClient.constructKey(
+ this.job.getJobID(), "checkpoint",
+ String.valueOf(peer.getPeerIndex())), writableArray, true, null);
+ }
+ checkpointState = isReadyToCheckpoint();
+ checkpointMessageCount = 0;
+ }
+
+ LOG.info("checkpoingNext = " + checkpointState
+ + " checkpointMessageCount = " + checkpointMessageCount);
+ }
+
+ @Override
+ public void onInitialized() {
+
+ }
+
+ @Override
+ public void onMessageSent(String peerName, M message) {
+ }
+
+ @Override
+ public void onMessageReceived(M message) {
+ String checkpointedPath = null;
+
+ if (message == null) {
+ LOG.error("Message M is found to be null");
+ }
+
+ synchronized (this) {
+ if (checkpointState) {
+ if (this.checkpointStream == null) {
+ checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+ try {
+ LOG.info("Creating path " + checkpointedPath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating path " + checkpointedPath);
+ }
+ checkpointStream = this.fs.create(new Path(checkpointedPath));
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath,
+ ioe);
+ throw new RuntimeException("Failed opening HDFS file "
+ + checkpointedPath, ioe);
+ }
+ }
+ try {
+ ++checkpointMessageCount;
+ checkpointStream.writeUTF(message.getClass().getCanonicalName());
+ message.write(checkpointStream);
+ } catch (IOException ioe) {
+ LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe);
+ throw new RuntimeException("Failed writing to HDFS file "
+ + checkpointedPath, ioe);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("message count = " + checkpointMessageCount);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void onClose() {
+
+ }
+
+ }
+
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>BSPFaultTolerantService</code> defines the fault tolerance service
+ * behavior. The fault tolerance service is a feature of a running job and not
+ * the system. A class defined on this behavior has the responsibility to create
+ * two objects. The first object <code>FaultTolerantMasterService</code> is
+ * used by the job at BSPMaster to handle fault tolerance related steps at the
+ * master. The second object <code>FaultTolerantPeerService</code> is used to
+ * define the behavior of object that would implement the fault tolerance
+ * related steps for recovery inside <code>BSPPeer</code> (in each of the BSP
+ * peers doing computations)
+ */
+public interface BSPFaultTolerantService<M extends Writable> {
+
+ /**
+ * The token by which a job can register its fault-tolerance service.
+ */
+ public static final String FT_SERVICE_CONF = "hama.ft.conf.class";
+
+ /**
+ * Creates the instance of <code>FaultTolerantMasterService</code> that would
+ * handle fault-tolerance related steps at BSPMaster task scheduler.
+ *
+ * @param jobId The identifier of the job.
+ * @param maxTaskAttempts Number of attempts allowed for recovering from
+ * failure.
+ * @param tasks The list of tasks in the job.
+ * @param conf The job configuration object.
+ * @param masterClient The synchronization client used by BSPMaster.
+ * @param allocationStrategy The task allocation strategy of the job.
+ * @return An instance of class inheriting
+ * <code>FaultTolerantMasterService</code>
+ */
+ public FaultTolerantMasterService constructMasterFaultTolerance(
+ BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+ Configuration conf, MasterSyncClient masterClient,
+ TaskAllocationStrategy allocationStrategy) throws Exception;
+
+ /**
+ * Creates an instance of <code>FaultTolerantPeerService</code> which defines
+ * the steps that has to be taken inside a peer for fault-tolerance.
+ *
+ * @param bspPeer The peer
+ * @param syncClient The synchronization client used by peer.
+ * @param superstep The superstep from which the peer is initialized.
+ * @param conf job configuration object
+ * @param messenger The messaging system between the peers
+ * @return An instance of class inheriting
+ * <code>FaultTolerantPeerService</code>
+ */
+ public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspPeer, PeerSyncClient syncClient,
+ InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+ long superstep, Configuration conf, MessageManager<M> messenger)
+ throws Exception;
+
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>FaultTolerantMasterService</code> defines the behavior of object
+ * responsible for doing fault-tolerance related work on BSPMaster task
+ * scheduler. This is defined per job.
+ */
+public interface FaultTolerantMasterService {
+
+ /**
+ * Returns true if recovery of the task in question is possible.
+ *
+ * @param tip <code>TaskInProgress</code> object that represents the task.
+ * @return true if recovery is possible.
+ */
+ public boolean isRecoveryPossible(TaskInProgress tip);
+
+ /**
+ * Returns true if the task is already slated to be recovered for failure.
+ *
+ * @param tip <code>TaskInProgress</code> object that represents the task.
+ * @return if task/job is already in process of recovery.
+ */
+ public boolean isAlreadyRecovered(TaskInProgress tip);
+
+ /**
+ * From the list of tasks that are failed, provide the task scheduler a set of
+ * actions and the grooms to which these actions must be sent for fault
+ * recovery.
+ *
+ * @param jip The job in question which has to be recovered.
+ * @param groomStatuses The map of grooms to their statuses.
+ * @param failedTasksInProgress The list of failed tasks.
+ * @param allTasksInProgress This list of all tasks in the job.
+ * @param actionMap The map of groom to the list of actions that are to be
+ * taken on that groom.
+ */
+ public void recoverTasks(JobInProgress jip,
+ Map<String, GroomServerStatus> groomStatuses,
+ TaskInProgress[] failedTasksInProgress,
+ TaskInProgress[] allTasksInProgress,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+ throws IOException;
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskStatus;
+
+/**
+ * <code>FaultTolerantPeerService</code> defines the steps required to be
+ * performed by peers for fault-tolerance. At different stages of peer
+ * execution, the service can take necessary measures to ensure that the peer
+ * computations could be recovered if any of them failed.
+ */
+public interface FaultTolerantPeerService<M extends Writable> {
+
+ /**
+ * This is called once the peer is initialized.
+ *
+ * @throws Exception
+ */
+ public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+ throws Exception;
+
+ /**
+ * This function is called before all the peers go into global sync/
+ *
+ * @throws Exception
+ */
+ public void beforeBarrier() throws Exception;
+
+ /**
+ * This functions is called after the peers enter the barrier but before they
+ * initate leaving the barrier.
+ *
+ * @throws Exception
+ */
+ public void duringBarrier() throws Exception;
+
+ /**
+ * This function is called every time the peer completes the global
+ * synchronization.
+ *
+ * @throws Exception
+ */
+ public void afterBarrier() throws Exception;
+
+}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sun Aug 5 11:07:48 2012
@@ -22,7 +22,9 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map.Entry;
+import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
@@ -61,6 +64,9 @@ public abstract class AbstractMessageMan
// the task attempt id
protected TaskAttemptID attemptId;
+ // List of listeners for all the sent messages
+ protected Queue<MessageEventListener<M>> messageListenerQueue;
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -70,12 +76,14 @@ public abstract class AbstractMessageMan
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
Configuration conf, InetSocketAddress peerAddress) {
+ this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
this.attemptId = attemptId;
this.peer = peer;
this.conf = conf;
this.peerAddress = peerAddress;
localQueue = getQueue();
localQueueForNextIteration = getSynchronizedQueue();
+
}
/*
@@ -84,18 +92,22 @@ public abstract class AbstractMessageMan
*/
@Override
public void close() {
- Collection<MessageQueue<M>> values = outgoingQueues.values();
- for (MessageQueue<M> msgQueue : values) {
- msgQueue.close();
- }
- localQueue.close();
- // remove possible disk queues from the path
try {
- FileSystem.get(conf).delete(
- DiskQueue.getQueueDir(conf, attemptId,
- conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
- } catch (IOException e) {
- LOG.warn("Queue dir couldn't be deleted");
+ Collection<MessageQueue<M>> values = outgoingQueues.values();
+ for (MessageQueue<M> msgQueue : values) {
+ msgQueue.close();
+ }
+ localQueue.close();
+ // remove possible disk queues from the path
+ try {
+ FileSystem.get(conf).delete(
+ DiskQueue.getQueueDir(conf, attemptId,
+ conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+ } catch (IOException e) {
+ LOG.warn("Queue dir couldn't be deleted");
+ }
+ } finally {
+ notifyClose();
}
}
@@ -139,6 +151,7 @@ public abstract class AbstractMessageMan
localQueue = localQueueForNextIteration.getMessageQueue();
localQueue.prepareRead();
localQueueForNextIteration = getSynchronizedQueue();
+ notifyInit();
}
/*
@@ -163,6 +176,7 @@ public abstract class AbstractMessageMan
queue.add(msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
outgoingQueues.put(targetPeerAddress, queue);
+ notifySentMessage(peerName, msg);
}
/*
@@ -206,4 +220,68 @@ public abstract class AbstractMessageMan
this.conf = conf;
}
+ private void notifySentMessage(String peerName, M message) {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onMessageSent(peerName, message);
+ }
+ }
+
+ private void notifyReceivedMessage(M message) throws IOException {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onMessageReceived(message);
+ }
+ }
+
+ private void notifyInit() {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onInitialized();
+ }
+ }
+
+ private void notifyClose() {
+ Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+ .iterator();
+ while (iterator.hasNext()) {
+ iterator.next().onClose();
+ }
+ }
+
+
+
+ @Override
+ public void registerListener(MessageEventListener<M> listener)
+ throws IOException {
+ if(listener != null)
+ this.messageListenerQueue.add(listener);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException{
+ for (Writable message : bundle.getMessages()) {
+ loopBackMessage((M)message);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void loopBackMessage(Writable message) throws IOException{
+ this.localQueueForNextIteration.add((M)message);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+ notifyReceivedMessage((M)message);
+
+ }
+
+
+
+
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sun Aug 5 11:07:48 2012
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.Iterator;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
@@ -61,14 +60,8 @@ public final class AvroMessageManagerImp
server.close();
}
- public void put(BSPMessageBundle<M> messages) {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- messages.getMessages().size());
- Iterator<M> iterator = messages.getMessages().iterator();
- while (iterator.hasNext()) {
- this.localQueueForNextIteration.add(iterator.next());
- iterator.remove();
- }
+ public void put(BSPMessageBundle<M> messages) throws IOException {
+ this.loopBackMessages(messages);
}
@SuppressWarnings("unchecked")
@@ -139,5 +132,4 @@ public final class AvroMessageManagerImp
return ByteBuffer.wrap(data);
}
}
-
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java Sun Aug 5 11:07:48 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp.message;
+import java.io.IOException;
+
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
@@ -35,7 +37,7 @@ public interface HadoopMessageManager<M
*
* @param msg
*/
- public void put(M msg);
+ public void put(M msg) throws IOException;
/**
* This method puts a messagebundle for the next iteration. Accessed
@@ -43,7 +45,7 @@ public interface HadoopMessageManager<M
*
* @param messages
*/
- public void put(BSPMessageBundle<M> messages);
+ public void put(BSPMessageBundle<M> messages) throws IOException;
/**
* This method puts a compressed message bundle for the next iteration.
@@ -51,6 +53,6 @@ public interface HadoopMessageManager<M
*
* @param compMsgBundle
*/
- public void put(BSPCompressedBundle compMsgBundle);
+ public void put(BSPCompressedBundle compMsgBundle) throws IOException;
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sun Aug 5 11:07:48 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -115,24 +114,19 @@ public final class HadoopMessageManagerI
}
@Override
- public final void put(M msg) {
- this.localQueueForNextIteration.add(msg);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+ public final void put(M msg) throws IOException {
+ loopBackMessage(msg);
}
@Override
- public final void put(BSPMessageBundle<M> messages) {
- for (M message : messages.getMessages()) {
- this.localQueueForNextIteration.add(message);
- }
+ public final void put(BSPMessageBundle<M> messages) throws IOException {
+ loopBackMessages(messages);
}
@Override
- public final void put(BSPCompressedBundle compMsgBundle) {
+ public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
- for (M message : bundle.getMessages()) {
- this.localQueueForNextIteration.add(message);
- }
+ loopBackMessages(bundle);
}
@Override
@@ -141,4 +135,5 @@ public final class HadoopMessageManagerI
return versionID;
}
+
}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface MessageEventListener<M> {
+
+ /**
+ *
+ *
+ */
+ public static enum MessageManagerEvent {
+ INITIALIZED, MESSAGE_SENT, MESSAGE_RECEIVED, CLOSE
+ }
+
+ /**
+ * The function to handle the event when the queue is initialized.
+ */
+ void onInitialized();
+
+ /**
+ * The function to handle the event when a message is sent.
+ * <code>message</code> should not be modified.
+ *
+ * @param peerName Name of the peer to be sent.
+ * @param message The message set.
+ */
+ void onMessageSent(String peerName, final M message);
+
+ /**
+ * The function to handle the event when a message is received.
+ * <code>message</code> should not be modified.
+ *
+ * @param message The message received.
+ */
+ void onMessageReceived(final M message);
+
+ /**
+ * The function to handle the event when the queue is closed.
+ */
+ void onClose();
+
+}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sun Aug 5 11:07:48 2012
@@ -34,7 +34,7 @@ import org.apache.hama.bsp.TaskAttemptID
*
*/
public interface MessageManager<M extends Writable> {
-
+
public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
/**
@@ -96,4 +96,25 @@ public interface MessageManager<M extend
*/
public int getNumCurrentMessages();
+ /**
+ * Send the messages to self to receive in the next superstep.
+ */
+ public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException;
+
+ /**
+ * Send the message to self to receive in the next superstep.
+ */
+ public void loopBackMessage(Writable message) throws IOException;
+
+ /**
+ * Register a listener for the events in message manager.
+ *
+ * @param listener <code>MessageEventListener</code> object that processes the
+ * messages sent to remote peer.
+ * @throws IOException
+ */
+ public void registerListener(MessageEventListener<M> listener)
+ throws IOException;
+
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Sun Aug 5 11:07:48 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp.sync;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.TaskAttemptID;
@@ -100,7 +102,7 @@ public abstract class BSPPeerSyncClient
*
* @throws InterruptedException
*/
- public abstract void close() throws InterruptedException;
+ public abstract void close() throws IOException;
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java Sun Aug 5 11:07:48 2012
@@ -31,28 +31,23 @@ public interface MasterSyncClient extend
*
* @param conf The configuration parameters to initialize the client.
*/
- public abstract void init(HamaConfiguration conf);
+ public void init(HamaConfiguration conf);
/**
* Clears all information stored.
*/
- public abstract void clear();
+ public void clear();
/**
* Register a newly added job
* @param string
*/
- public abstract void registerJob(String string);
+ public void registerJob(String string);
/**
* Deregister the job from the system.
* @param string
*/
- public abstract void deregisterJob(String string);
+ public void deregisterJob(String string);
- /**
- * Closes the client.
- */
- public abstract void close();
-
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Sun Aug 5 11:07:48 2012
@@ -100,12 +100,4 @@ public interface PeerSyncClient extends
*/
public void stopServer();
- /**
- * This method should close all used resources, e.G. a ZooKeeper instance.
- *
- * @throws InterruptedException
- */
- public void close() throws InterruptedException;
-
-
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Sun Aug 5 11:07:48 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp.sync;
+import java.io.IOException;
+
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPJobID;
@@ -56,7 +58,7 @@ public interface SyncClient {
* @return the value if found. Returns null if there was any error of if there
* was no value stored for the key.
*/
- public Writable getInformation(String key, Class<? extends Writable> classType);
+ public boolean getInformation(String key, Writable valueHolder);
/**
* Store new key in key set.
@@ -95,4 +97,17 @@ public interface SyncClient {
public boolean registerListener(String key, SyncEvent event,
SyncEventListener listener);
+ /**
+ * Delete the key and the information stored under it.
+ * @param key
+ * @param listener
+ * @return
+ */
+ public boolean remove(String key, SyncEventListener listener);
+
+ /**
+ *
+ */
+ public void close() throws IOException;
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java Sun Aug 5 11:07:48 2012
@@ -18,7 +18,6 @@
package org.apache.hama.bsp.sync;
import java.io.IOException;
-import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -81,45 +80,6 @@ public class ZKSyncBSPMasterClient exten
}
- /**
- * Clears all sub-children of node bspRoot
- */
- private void clearZKNodes() {
- try {
- Stat s = zk.exists(bspRoot, false);
- if (s != null) {
- clearZKNodes(bspRoot);
- }
-
- } catch (Exception e) {
- LOG.warn("Could not clear zookeeper nodes.", e);
- }
- }
-
- /**
- * Clears all sub-children of node rooted at path.
- *
- * @param path
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void clearZKNodes(String path) throws KeeperException,
- InterruptedException {
- ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
-
- if (list.size() == 0) {
- return;
-
- } else {
- for (String node : list) {
- clearZKNodes(path + "/" + node);
- LOG.info("Deleting " + path + "/" + node);
- zk.delete(path + "/" + node, -1); // delete any version of this
- // node.
- }
- }
- }
-
private void createJobRoot(String string) {
writeNode(string, null, true, null);
}
@@ -164,4 +124,9 @@ public class ZKSyncBSPMasterClient exten
LOG.debug("Processing event type " + arg0.getType().toString());
}
+
+ public ZooKeeper getZK() {
+ return this.zk;
+ }
+
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java Sun Aug 5 11:07:48 2012
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,9 +32,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.ReflectionUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
@@ -77,11 +78,14 @@ public abstract class ZKSyncClient imple
* created.
*/
protected String getNodeName(TaskAttemptID taskId, long superstep) {
- return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
- + taskId.toString();
+ return constructKey(taskId.getJobID(), "sync", "" + superstep,
+ taskId.toString());
+ //
+ // bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+ // + taskId.toString();
}
-
- private String correctKey(String key){
+
+ private String correctKey(String key) {
if (!key.startsWith("/")) {
key = "/" + key;
}
@@ -114,7 +118,7 @@ public abstract class ZKSyncClient imple
* @throws InterruptedException
*/
protected Stat getStat(final String path) throws KeeperException,
- InterruptedException {
+ InterruptedException {
synchronized (zk) {
return zk.exists(path, false);
}
@@ -125,10 +129,17 @@ public abstract class ZKSyncClient imple
InterruptedException {
synchronized (zk) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking node " + path);
+ }
Stat s = zk.exists(path, false);
if (null == s) {
try {
zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created node " + path);
+ }
+
} catch (KeeperException.NodeExistsException nee) {
LOG.debug("Ignore because znode may be already created at " + path,
nee);
@@ -170,30 +181,27 @@ public abstract class ZKSyncClient imple
/**
* Utility function to read Writable object value from byte array.
+ *
* @param data The byte array
* @param classType The Class object of expected Writable object.
- * @return The instance of Writable object.
+ * @return The instance of Writable object.
* @throws IOException
*/
- protected Writable getValueFromBytes(
- byte[] data, Class<? extends Writable> classType) throws IOException{
- Writable value = null;
+ protected boolean getValueFromBytes(byte[] data,
+ Writable valueHolder) throws IOException {
if (data != null) {
ByteArrayInputStream istream = new ByteArrayInputStream(data);
- value = ReflectionUtils
- .newInstance(classType, new Object[0]);
DataInputStream diStream = new DataInputStream(istream);
try {
- value.readFields(diStream);
- }
- finally {
+ valueHolder.readFields(diStream);
+ } finally {
diStream.close();
}
+ return true;
}
- return value;
+ return false;
}
-
/**
* Read value stored in the Zookeeper node.
*
@@ -202,23 +210,21 @@ public abstract class ZKSyncClient imple
* @return The Writable object constructed from the value read from the
* Zookeeper node.
*/
- protected Writable extractData(String path,
- Class<? extends Writable> classType) {
+ protected boolean extractData(String path,
+ Writable valueHolder) {
try {
Stat stat = getStat(path);
if (stat != null) {
byte[] data = this.zk.getData(path, false, stat);
- Writable value = null;
- try{
- value = getValueFromBytes(data, classType);
- }
- catch (IOException e) {
+ try {
+ getValueFromBytes(data, valueHolder);
+ } catch (IOException e) {
LOG.error(
new StringBuffer(200).append("Error getting data from path ")
- .append(path).toString(), e);
- value = null;
+ .append(path).toString(), e);
+ return false;
}
- return value;
+ return true;
}
} catch (KeeperException e) {
@@ -230,7 +236,7 @@ public abstract class ZKSyncClient imple
.append(path).toString(), e);
}
- return null;
+ return false;
}
@@ -277,7 +283,7 @@ public abstract class ZKSyncClient imple
watcher);
}
pathBuffer.append("/")
- .append(pathComponents[pathComponents.length - 1]);
+ .append(pathComponents[pathComponents.length - 1]);
CreateMode mode = CreateMode.EPHEMERAL;
if (persistent) {
mode = CreateMode.PERSISTENT;
@@ -331,10 +337,10 @@ public abstract class ZKSyncClient imple
}
@Override
- public Writable getInformation(String key, Class<? extends Writable> classType) {
+ public boolean getInformation(String key, Writable valueHolder) {
key = correctKey(key);
final String path = key;
- return extractData(path, classType);
+ return extractData(path, valueHolder);
}
@Override
@@ -426,7 +432,74 @@ public abstract class ZKSyncClient imple
return children;
}
-
-
+ /**
+ * Clears all sub-children of node bspRoot
+ */
+ protected void clearZKNodes() {
+ try {
+ Stat s = zk.exists(bspRoot, false);
+ if (s != null) {
+ clearZKNodes(bspRoot);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Could not clear zookeeper nodes.", e);
+ }
+ }
+
+ /**
+ * Clears all sub-children of node rooted at path.
+ *
+ * @param path
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ protected void clearZKNodes(String path) throws KeeperException,
+ InterruptedException {
+ ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+ if (list.size() == 0) {
+ return;
+
+ } else {
+ for (String node : list) {
+ clearZKNodes(path + "/" + node);
+ LOG.info("Deleting " + path + "/" + node);
+ zk.delete(path + "/" + node, -1); // delete any version of this
+ // node.
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent arg0) {
+ }
+
+ @Override
+ public boolean remove(String key, SyncEventListener listener) {
+ key = correctKey(key);
+ try {
+ clearZKNodes(key);
+ this.zk.delete(key, -1);
+ return true;
+ } catch (KeeperException e) {
+ LOG.error("Error deleting key " + key);
+ } catch (InterruptedException e) {
+ LOG.error("Error deleting key " + key);
+ }
+ return false;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ try {
+ this.zk.close();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ }
}
Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1369551&r1=1369550&r2=1369551&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Sun Aug 5 11:07:48 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hama.bsp.sync;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
@@ -91,9 +92,10 @@ public class ZooKeeperSyncClientImpl ext
try {
synchronized (zk) {
- final String pathToJobIdZnode = bspRoot + "/"
- + taskId.getJobID().toString();
- final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
+
+ final String pathToSuperstepZnode =
+ constructKey(taskId.getJobID(), "sync", ""+superstep);
+
writeNode(pathToSuperstepZnode, null, true, null);
BarrierWatcher barrierWatcher = new BarrierWatcher();
// this is really needed to register the barrier watcher, don't remove
@@ -140,8 +142,10 @@ public class ZooKeeperSyncClientImpl ext
public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
final long superstep) throws SyncException {
try {
- final String pathToSuperstepZnode = bspRoot + "/"
- + taskId.getJobID().toString() + "/" + superstep;
+// final String pathToSuperstepZnode = bspRoot + "/"
+// + taskId.getJobID().toString() + "/" + superstep;
+ final String pathToSuperstepZnode =
+ constructKey(taskId.getJobID(), "sync", ""+superstep);
while (true) {
List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
@@ -278,11 +282,16 @@ public class ZooKeeperSyncClientImpl ext
for (String s : allPeers) {
byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
this, null);
- TaskAttemptID thatTask = (TaskAttemptID) getValueFromBytes(data,
- TaskAttemptID.class);
- LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
- + thatTask.getTaskID().getId() + " : " + s);
- sortedMap.put(thatTask.getTaskID().getId(), s);
+ TaskAttemptID thatTask = new TaskAttemptID();
+ boolean result = getValueFromBytes(data, thatTask);
+
+ if(result){
+ LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+ + thatTask.getTaskID().getId() + " : " + s);
+ sortedMap.put(thatTask.getTaskID().getId(), s);
+ }
+
+
}
} catch (Exception e) {
@@ -303,8 +312,13 @@ public class ZooKeeperSyncClientImpl ext
}
@Override
- public void close() throws InterruptedException {
- zk.close();
+ public void close() throws IOException {
+ try{
+ zk.close();
+ }
+ catch(InterruptedException e){
+ throw new IOException(e);
+ }
}
@Override
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BSPResource defines a resource entity that would be used as a factor
+ * for allocating tasks on groom-servers.
+ */
+public abstract class BSPResource {
+
+ /**
+ * Returns the list of grooms on which the current resource is available or
+ * local or is best chosen for the task.
+ *
+ * @param tip The <code>TaskInProgress</code> representing the task to
+ * schedule.
+ * @return The list of groomserver host names.
+ */
+ public abstract String[] getGrooms(TaskInProgress tip);
+
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BestEffortDataLocalTaskAllocator</code> is a simple task allocator that
+ * takes in only the data locality as a constraint for allocating tasks. It
+ * makes the best attempt to schedule task on the groom server with the input
+ * split. If the aforesaid is not possible, it selects any other available groom
+ * to allocate tasks on.
+ */
+public class BestEffortDataLocalTaskAllocator implements TaskAllocationStrategy {
+
+ Log LOG = LogFactory.getLog(BestEffortDataLocalTaskAllocator.class);
+
+ @Override
+ public void initialize(Configuration conf) {
+ }
+
+ /**
+ * Returns the first groom that has a slot to schedule a task on.
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @return
+ */
+ private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+ Iterator<String> groomIter = grooms.keySet().iterator();
+ while (groomIter.hasNext()) {
+ GroomServerStatus groom = grooms.get(groomIter.next());
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * From the set of grooms given, returns the groom on which a task could be
+ * scheduled on.
+ *
+ * @param grooms
+ * @param tasksInGroomMap
+ * @param possibleLocations
+ * @return
+ */
+ private String getGroomToSchedule(Map<String, GroomServerStatus> grooms,
+ Map<GroomServerStatus, Integer> tasksInGroomMap,
+ String[] possibleLocations) {
+
+ for (int i = 0; i < possibleLocations.length; ++i) {
+ String location = possibleLocations[i];
+ GroomServerStatus groom = grooms.get(location);
+ if (groom == null)
+ continue;
+ Integer taskInGroom = tasksInGroomMap.get(groom);
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ return groom.getGroomHostName();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask())
+ return null;
+
+ String groomName = null;
+ if (selectedGrooms != null) {
+ groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap,
+ selectedGrooms);
+ }
+
+ if (groomName == null) {
+ groomName = getAnyGroomToSchedule(groomStatuses, taskCountInGroomMap);
+ }
+
+ if (groomName != null) {
+ return groomStatuses.get(groomName);
+ }
+
+ return null;
+ }
+
+ /**
+ * Select grooms that has the block of data locally stored on the groom
+ * server.
+ */
+ @Override
+ public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ if (!taskInProgress.canStartTask()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot start task based on id");
+ }
+ return new String[0];
+ }
+
+ RawSplit rawSplit = taskInProgress.getFileSplit();
+ if (rawSplit != null) {
+ return rawSplit.getLocations();
+ }
+ return null;
+ }
+
+ /**
+ * This operation is not supported.
+ */
+ @Override
+ public Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress) {
+ throw new UnsupportedOperationException(
+ "This API is not supported for the called API function call.");
+ }
+
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>RawSplitResource</code> defines the data block resource that could be
+ * used to find which groom to schedule for data-locality.
+ */
+public class RawSplitResource extends BSPResource{
+
+ private RawSplit split;
+
+ public RawSplitResource(){
+
+ }
+
+ /**
+ * Initialize the resource with data block split information.
+ * @param split The data-split provided by <code>BSPJobClient</client>
+ */
+ public RawSplitResource(RawSplit split){
+ this.split = split;
+ }
+
+ @Override
+ public String[] getGrooms(TaskInProgress tip) {
+ return split.getLocations();
+ }
+
+}
Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java?rev=1369551&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java Sun Aug 5 11:07:48 2012
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>TaskAllocationStrategy</class> defines the behavior of task allocation
+ * strategy that is employed by a Hama job to schedule tasks in GroomServers in
+ * Hama cluster. The function <code>selectGrooms</code> is responsible to define
+ * the strategy to select the grooms. This list of grooms could be used in the
+ * functions <code>getGroomToAllocate</code> or <code>getGroomsToAllocate</code>
+ * in the parameter <code>selectedGrooms</code>. The two functions are not given
+ * the responsibility to select grooms because these functions could also be
+ * handling the task of allocating tasks on any other restricted set of grooms
+ * that the caller invokes them for.
+ *
+ */
+
+@Unstable
+public interface TaskAllocationStrategy {
+
+ /**
+ * Initializes the <code>TaskAllocationStrategy</code> instance.
+ *
+ * @param conf Hama configuration
+ */
+ public abstract void initialize(Configuration conf);
+
+ /**
+ * Defines the task-allocation strategy to select the grooms based on the
+ * resource constraints and the task related restrictions posed. This function
+ * could be used to populate the <code>selectedGrooms</code> argument in the
+ * functions <code>getGroomToAllocate</code> and
+ * <code>getGroomsToAllocate</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return An array of hostnames where the tasks could be allocated on.
+ */
+ @Unstable
+ public abstract String[] selectGrooms(
+ Map<String, GroomServerStatus> groomStatuses,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+
+ /**
+ * Returns the best groom to run the task on based on the set of grooms
+ * provided. The designer of the class can choose to populate the
+ * <code>selectedGrooms</code> value with the function
+ * <code>selectGrooms</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param selectedGrooms An array of selected groom host-names to select from.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return Host Name of the selected groom. Returns null if no groom could be
+ * found.
+ */
+ @Unstable
+ public abstract GroomServerStatus getGroomToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+
+ /**
+ * Returns the best grooms to run the task on based on the set of grooms
+ * provided. The designer of the class can choose to populate the
+ * <code>selectedGrooms</code> value with the function
+ * <code>selectGrooms</code>
+ *
+ * @param groomStatuses The map of groom-name to
+ * <code>GroomServerStatus</code> object for all known grooms.
+ * @param selectedGrooms An array of selected groom host-names to select from.
+ * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+ * soon)
+ * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+ * @return Host Names of the selected grooms where the task could be
+ * allocated. Returns null if no groom could be found.
+ */
+ @Unstable
+ public abstract Set<GroomServerStatus> getGroomsToAllocate(
+ Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+ Map<GroomServerStatus, Integer> taskCountInGroomMap,
+ BSPResource[] resources, TaskInProgress taskInProgress);
+}