You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/01/12 13:32:42 UTC
svn commit: r1058109 [1/2] - in /incubator/hama/trunk: ./ conf/
src/java/org/apache/hama/ src/java/org/apache/hama/bsp/
src/java/org/apache/hama/ipc/ src/test/org/apache/hama/
src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Wed Jan 12 12:32:39 2011
New Revision: 1058109
URL: http://svn.apache.org/viewvc?rev=1058109&view=rev
Log:
Modify MniCluster so that developers can benefit when testing using Junit
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java
incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/build.xml
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Jan 12 12:32:39 2011
@@ -51,6 +51,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-346: Modify MniCluster so that developers can benefit when testing using Junit
+ (ChiaHung Lin via edwardyoon)
HAMA-340: Implementation of job submit command (edwardyoon)
HAMA-278: Few minor refactoring (edwardyoon)
HAMA-336: The all taskid variable's type should be declared as a TaskAttemptID
Modified: incubator/hama/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/build.xml?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/build.xml (original)
+++ incubator/hama/trunk/build.xml Wed Jan 12 12:32:39 2011
@@ -257,6 +257,10 @@
<!-- Run unit tests -->
<!-- ================================================================== -->
<path id="test.classpath">
+ <fileset dir="${lib.dir}/jsp-2.1/">
+ <include name="*.jar" />
+ </fileset>
+ <pathelement location="${conf.dir}" />
<pathelement location="${src.test}" />
<pathelement location="${build.test}" />
<path refid="classpath" />
@@ -281,9 +285,12 @@
<junit printsummary="yes" showoutput="${test.output}" haltonfailure="no" fork="yes" maxmemory="512m" errorProperty="tests.failed" failureProperty="tests.failed" timeout="${test.timeout}">
<classpath refid="test.classpath" />
<formatter type="${test.junit.output.format}" />
- <batchtest todir="${build.report.tests}">
+ <batchtest todir="${build.report.tests}" unless="testcase">
<fileset dir="${src.test}" includes="**/Test*.java" excludes="**/${test.exclude}.java" />
</batchtest>
+ <batchtest todir="${build.report.tests}" if="testcase">
+ <fileset dir="${src.test}" includes="**/${testcase}.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
</junit>
<fail if="tests.failed">Tests failed!</fail>
</target>
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Wed Jan 12 12:32:39 2011
@@ -41,6 +41,11 @@
<description>The port an groom server binds to.</description>
</property>
<property>
+ <name>bsp.groom.rpc.port</name>
+ <value>50000</value>
+ <description>The port an groom rpc binds to.</description>
+ </property>
+ <property>
<name>bsp.local.dir</name>
<value>${hadoop.tmp.dir}/bsp/local</value>
<description>local directory for temporal store</description>
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Wed Jan 12 12:32:39 2011
@@ -23,6 +23,16 @@ package org.apache.hama;
* Some constants used in the Hama
*/
public interface Constants {
+
+ public static final String GROOM_RPC_HOST = "bsp.groom.rpc.hostname";
+
+ public static final String DEFAULT_GROOM_RPC_HOST = "0.0.0.0";
+
+ public static final String GROOM_RPC_PORT = "bsp.groom.rpc.port";
+
+ /** Default port region rpc server listens on. */
+ public static final int DEFAULT_GROOM_RPC_PORT = 50000;
+
///////////////////////////////////////
// Constants for BSP Package
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Wed Jan 12 12:32:39 2011
@@ -42,14 +42,14 @@ public class BSPJob extends BSPJobContex
super(conf, null);
jobClient = new BSPJobClient(conf);
}
-
+
public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
- super(new Path(jobFile), jobID);
+ super(new Path(jobFile), jobID);
}
/**
@@ -65,7 +65,7 @@ public class BSPJob extends BSPJobContex
super(conf, null);
setNumBspTask(tasks);
}
-
+
@SuppressWarnings("unchecked")
public BSPJob(HamaConfiguration conf, Class exampleClass) throws IOException {
this(conf);
@@ -78,10 +78,10 @@ public class BSPJob extends BSPJobContex
+ " instead of " + state);
}
}
-
- ///////////////////////////////////////
+
+ // /////////////////////////////////////
// Setter for Job Submission
- ///////////////////////////////////////
+ // /////////////////////////////////////
public void setWorkingDirectory(Path dir) throws IOException {
ensureState(JobState.DEFINE);
dir = new Path(getWorkingDirectory(), dir);
@@ -99,7 +99,7 @@ public class BSPJob extends BSPJobContex
ensureState(JobState.DEFINE);
conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
}
-
+
@SuppressWarnings("unchecked")
public Class<? extends BSP> getBspClass() {
return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
@@ -115,7 +115,7 @@ public class BSPJob extends BSPJobContex
conf.set("bsp.jar", jar);
}
}
-
+
@SuppressWarnings("unchecked")
private static String findContainingJar(Class my_class) {
ClassLoader loader = my_class.getClassLoader();
@@ -144,18 +144,18 @@ public class BSPJob extends BSPJobContex
ensureState(JobState.DEFINE);
conf.set("bsp.job.name", name);
}
-
+
public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) {
-
+
}
public void setUser(String user) {
conf.set("user.name", user);
}
-
- ///////////////////////////////////////
+
+ // /////////////////////////////////////
// Methods for Job Control
- ///////////////////////////////////////
+ // /////////////////////////////////////
public float progress() throws IOException {
ensureState(JobState.RUNNING);
return info.progress();
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Wed Jan 12 12:32:39 2011
@@ -23,16 +23,14 @@ import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,14 +44,15 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterServerProtocol;
import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.WorkerProtocol;
/**
* BSPMaster is responsible to control all the groom servers and to manage bsp
* jobs.
*/
-public class BSPMaster implements JobSubmissionProtocol, InterServerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, // InterServerProtocol,
GroomServerManager {
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
@@ -73,7 +72,8 @@ public class BSPMaster implements JobSub
// Attributes
String masterIdentifier;
- private Server interServer;
+ // private Server interServer;
+ private Server masterServer;
// Filesystem
static final String SUBDIR = "bspMaster";
@@ -86,28 +86,21 @@ public class BSPMaster implements JobSub
final static FsPermission SYSTEM_FILE_PERMISSION = FsPermission
.createImmutable((short) 0700); // rwx------
- // Groom Servers
- // (groom name --> last sent HeartBeatResponse)
- Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
- private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
- // maps groom server names to peer names
- private HashMap<String, String> groomServerPeers = new HashMap<String, String>();
-
// Jobs' Meta Data
private Integer nextJobId = Integer.valueOf(1);
// private long startTime;
- private int totalSubmissions = 0;
- private int totalTasks = 0;
- private int totalTaskCapacity;
+ private int totalSubmissions = 0; // how many jobs has been submitted by
+ // clients
+ private int totalTasks = 0; // currnetly running tasks
+ private int totalTaskCapacity; // max tasks that groom server can run
+
private Map<BSPJobID, JobInProgress> jobs = new TreeMap<BSPJobID, JobInProgress>();
private TaskScheduler taskScheduler;
- TreeMap<TaskAttemptID, String> taskIdToGroomNameMap = new TreeMap<TaskAttemptID, String>();
- TreeMap<String, TreeSet<TaskAttemptID>> groomNameToTaskIdsMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
- Map<TaskAttemptID, TaskInProgress> taskIdToTaskInProgressMap = new TreeMap<TaskAttemptID, TaskInProgress>();
+ // GroomServers cache
+ protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
- Vector<JobInProgress> jobInitQueue = new Vector<JobInProgress>();
- JobInitThread initJobs = new JobInitThread();
+ private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
/**
* Start the BSPMaster process, listen on the indicated hostname/port
@@ -123,16 +116,17 @@ public class BSPMaster implements JobSub
this.masterIdentifier = identifier;
// expireLaunchingTaskThread.start();
- // Create the scheduler
+ // Create the scheduler and init scheduler services
Class<? extends TaskScheduler> schedulerClass = conf.getClass(
"bsp.master.taskscheduler", SimpleTaskScheduler.class,
TaskScheduler.class);
this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(
schedulerClass, conf);
- InetSocketAddress addr = getAddress(conf);
- this.interServer = RPC.getServer(this, addr.getHostName(), addr
- .getPort(), conf);
+ String host = getAddress(conf).getHostName();
+ int port = getAddress(conf).getPort();
+ LOG.info("RPC BSPMaster: host " + host + " port " + port);
+ this.masterServer = RPC.getServer(this, host, port, conf);
while (!Thread.currentThread().isInterrupted()) {
try {
@@ -175,63 +169,125 @@ public class BSPMaster implements JobSub
deleteLocalFiles(SUBDIR);
}
- // /////////////////////////////////////////////////////
- // Accessors for objects that want info on jobs, tasks,
- // grooms, etc.
- // /////////////////////////////////////////////////////
- public GroomServerStatus getGroomServer(String groomID) {
- synchronized (groomServers) {
- return groomServers.get(groomID);
+ /**
+ * A GroomServer registers with its status to BSPMaster when startup, which
+ * will update GroomServers cache.
+ *
+ * @param status to be updated in cache.
+ * @return true if registering successfully; false if fail.
+ */
+ @Override
+ public boolean register(GroomServerStatus status) throws IOException {
+ if (null == status) {
+ LOG.error("No groom server status.");
+ throw new NullPointerException("No groom server status.");
}
+ Throwable e = null;
+ try {
+ WorkerProtocol wc = (WorkerProtocol) RPC.waitForProxy(
+ WorkerProtocol.class, WorkerProtocol.versionID,
+ resolveWorkerAddress(status.getRpcServer()), this.conf);
+ if (null == wc) {
+ LOG
+ .warn("Fail to create Worker client at host "
+ + status.getPeerName());
+ return false;
+ }
+ // TODO: need to check if peer name has changed
+ groomServers.putIfAbsent(status, wc);
+ } catch (UnsupportedOperationException u) {
+ e = u;
+ } catch (ClassCastException c) {
+ e = c;
+ } catch (NullPointerException n) {
+ e = n;
+ } catch (IllegalArgumentException i) {
+ e = i;
+ } catch (Exception ex) {
+ e = ex;
+ }
+
+ if (null != e) {
+ LOG.error("Fail to register GroomServer " + status.getGroomName(), e);
+ return false;
+ }
+
+ return true;
}
- public List<String> groomServerNames() {
- List<String> activeGrooms = new ArrayList<String>();
+ private static InetSocketAddress resolveWorkerAddress(String data) {
+ return new InetSocketAddress(data.split(":")[0], Integer.parseInt(data
+ .split(":")[1]));
+ }
+
+ private void updateGroomServersKey(GroomServerStatus old,
+ GroomServerStatus newKey) {
synchronized (groomServers) {
- for (GroomServerStatus status : groomServers.values()) {
- activeGrooms.add(status.getGroomName());
- }
+ WorkerProtocol worker = groomServers.remove(old);
+ groomServers.put(newKey, worker);
}
- return activeGrooms;
}
- // ///////////////////////////////////////////////////////////////
- // Used to init new jobs that have just been created
- // ///////////////////////////////////////////////////////////////
- class JobInitThread implements Runnable {
- private volatile boolean shouldRun = true;
-
- public JobInitThread() {
- }
-
- public void run() {
- while (shouldRun) {
- JobInProgress job = null;
- synchronized (jobInitQueue) {
- if (jobInitQueue.size() > 0) {
- job = (JobInProgress) jobInitQueue.elementAt(0);
- jobInitQueue.remove(job);
- } else {
- try {
- jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
- } catch (InterruptedException iex) {
- }
- }
+ @Override
+ public boolean report(Directive directive) throws IOException {
+ // check returned directive type if equals response
+ if (directive.getType().value() != Directive.Type.Response.value()) {
+ throw new IllegalStateException("GroomServer should report()"
+ + " with Response. Current report type:" + directive.getType());
+ }
+ // update GroomServerStatus hold in groomServers cache.
+ GroomServerStatus fstus = directive.getStatus();
+ // groomServers cache contains groom server status reported back
+ if (groomServers.containsKey(fstus)) {
+ GroomServerStatus ustus = null;
+ for (GroomServerStatus old : groomServers.keySet()) {
+ if (old.equals(fstus)) {
+ ustus = fstus;
+ updateGroomServersKey(old, ustus);
+ break;
}
- try {
- if (job != null) {
- job.initTasks();
+ }// for
+ if (null != ustus) {
+ List<TaskStatus> tlist = ustus.getTaskReports();
+ for (TaskStatus ts : tlist) {
+ JobInProgress jip = whichJob(ts.getJobId());
+ // TODO: need for each tip execute completed?
+ // each tip already maintain a data structure, checking
+ // if task status is completed
+ TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
+ .getTaskId()).getTaskID());
+ jip.completedTask(tip, ts);
+ LOG.info("JobInProgress id:" + jip.getJobID() + " status:"
+ + jip.getStatus());
+ if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.jobRemoved(jip);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter scheduler a job is moved.", ioe);
+ }
+ }
}
- } catch (Exception e) {
- LOG.warn("job init failed: " + e);
- job.kill();
}
+ } else {
+ throw new RuntimeException("BSPMaster contains GroomServerSatus, "
+ + "but fail to retrieve it.");
}
+ } else {
+ throw new RuntimeException("GroomServer not found."
+ + fstus.getGroomName());
}
+ return true;
+ }
- public void stopIniter() {
- shouldRun = false;
+ private JobInProgress whichJob(BSPJobID id) {
+ for (JobInProgress job : taskScheduler
+ .getJobs(SimpleTaskScheduler.PROCESSING_QUEUE)) {
+ if (job.getJobID().equals(id)) {
+ return job;
+ }
}
+ return null;
}
// /////////////////////////////////////////////////////////////
@@ -283,6 +339,7 @@ public class BSPMaster implements JobSub
BSPMaster result = new BSPMaster(conf, identifier);
result.taskScheduler.setGroomServerManager(result);
+ result.taskScheduler.start();
return result;
}
@@ -295,26 +352,27 @@ public class BSPMaster implements JobSub
}
/**
+ * BSPMaster identifier
*
- * @return
+ * @return String BSPMaster identification number
*/
private static String generateNewIdentifier() {
return new SimpleDateFormat("yyyyMMddHHmm").format(new Date());
}
public void offerService() throws InterruptedException, IOException {
- new Thread(this.initJobs).start();
- LOG.info("Starting jobInitThread");
-
- this.interServer.start();
+ // this.interServer.start();
+ this.masterServer.start();
synchronized (this) {
state = State.RUNNING;
}
LOG.info("Starting RUNNING");
- this.interServer.join();
- LOG.info("Stopped interServer");
+ // this.interServer.join();
+ this.masterServer.join();
+
+ LOG.info("Stopped RPC Master server.");
}
// //////////////////////////////////////////////////
@@ -323,8 +381,8 @@ public class BSPMaster implements JobSub
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
- if (protocol.equals(InterServerProtocol.class.getName())) {
- return InterServerProtocol.versionID;
+ if (protocol.equals(MasterProtocol.class.getName())) {
+ return MasterProtocol.versionID;
} else if (protocol.equals(JobSubmissionProtocol.class.getName())) {
return JobSubmissionProtocol.versionID;
} else {
@@ -332,185 +390,6 @@ public class BSPMaster implements JobSub
}
}
- /**
- * A RPC method for transmitting each peer status from peer to master.
- *
- * @throws IOException
- */
- @Override
- public HeartbeatResponse heartbeat(GroomServerStatus status,
- boolean restarted, boolean initialContact, boolean acceptNewTasks,
- short responseId, int reportSize) throws IOException {
-
- // First check if the last heartbeat response got through
- String groomName = status.getGroomName();
- long now = System.currentTimeMillis();
-
- HeartbeatResponse prevHeartbeatResponse = groomToHeartbeatResponseMap
- .get(groomName);
-
- // Process this heartbeat
- short newResponseId = (short) (responseId + 1);
- status.setLastSeen(now);
- if (!processHeartbeat(status, initialContact)) {
- if (prevHeartbeatResponse != null) {
- groomToHeartbeatResponseMap.remove(groomName);
- }
- return new HeartbeatResponse(newResponseId,
- new GroomServerAction[] { new ReinitGroomAction() }, Collections
- .<String, String> emptyMap());
- }
-
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null,
- groomServerPeers);
- List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
-
- // Check for new tasks to be executed on the groom server
- if (acceptNewTasks) {
- GroomServerStatus groomStatus = getGroomServer(groomName);
- if (groomStatus == null) {
- LOG.warn("Unknown groom server polling; ignoring: " + groomName);
- } else {
- List<Task> taskList = taskScheduler.assignTasks(groomStatus);
-
- for (Task task : taskList) {
- if (task != null) {
- actions.add(new LaunchTaskAction(task));
- }
- }
- }
- }
-
- response.setActions(actions.toArray(new GroomServerAction[actions.size()]));
-
- groomToHeartbeatResponseMap.put(groomName, response);
- removeMarkedTasks(groomName);
- updateTaskStatuses(status);
-
- return response;
- }
-
- void updateTaskStatuses(GroomServerStatus status) {
- for (Iterator<TaskStatus> it = status.taskReports(); it.hasNext();) {
- TaskStatus report = it.next();
- report.setGroomServer(status.getGroomName());
- TaskAttemptID taskId = report.getTaskId();
- TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
- .get(taskId);
-
- if (tip == null) {
- LOG.info("Serious problem. While updating status, cannot find taskid "
- + report.getTaskId());
- } else {
- JobInProgress job = tip.getJob();
-
- if (report.getRunState() == TaskStatus.State.SUCCEEDED) {
- job.completedTask(tip, report);
- } else if (report.getRunState() == TaskStatus.State.FAILED) {
- // TODO Tell the job to fail the relevant task
-
- } else {
- job.updateTaskStatus(tip, report);
- }
- }
-
- }
- }
-
- // (trackerID -> TreeSet of completed taskids running at that tracker)
- TreeMap<String, TreeSet<TaskAttemptID>> trackerToMarkedTasksMap = new TreeMap<String, TreeSet<TaskAttemptID>>();
-
- private void removeMarkedTasks(String groomName) {
- // Purge all the 'marked' tasks which were running at groomServer
- TreeSet<TaskAttemptID> markedTaskSet = trackerToMarkedTasksMap
- .get(groomName);
- if (markedTaskSet != null) {
- for (TaskAttemptID taskid : markedTaskSet) {
- removeTaskEntry(taskid);
- LOG.info("Removed completed task '" + taskid + "' from '" + groomName
- + "'");
- }
- // Clear
- trackerToMarkedTasksMap.remove(groomName);
- }
- }
-
- private void removeTaskEntry(TaskAttemptID taskid) {
- // taskid --> groom
- String groom = taskIdToGroomNameMap.remove(taskid);
-
- // groom --> taskid
- if (groom != null) {
- TreeSet<TaskAttemptID> groomSet = groomNameToTaskIdsMap.get(groom);
- if (groomSet != null) {
- groomSet.remove(taskid);
- }
- }
-
- // taskid --> TIP
- taskIdToTaskInProgressMap.remove(taskid);
- LOG.debug("Removing task '" + taskid + "'");
- }
-
- private List<GroomServerAction> getTasksToKill(String groomName) {
- Set<TaskAttemptID> taskIds = groomNameToTaskIdsMap.get(groomName);
- if (taskIds != null) {
- List<GroomServerAction> killList = new ArrayList<GroomServerAction>();
- Set<String> killJobIds = new TreeSet<String>();
- for (TaskAttemptID killTaskId : taskIds) {
- TaskInProgress tip = (TaskInProgress) taskIdToTaskInProgressMap
- .get(killTaskId);
- if (tip.shouldCloseForClosedJob(killTaskId)) {
- //
- // This is how the BSPMaster ends a task at the GroomServer.
- // It may be successfully completed, or may be killed in
- // mid-execution.
- //
- if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
- killList.add(new KillTaskAction(killTaskId));
- LOG.debug(groomName + " -> KillTaskAction: " + killTaskId);
- } else {
- String killJobId = tip.getJob().getStatus().getJobID()
- .getJtIdentifier();
- killJobIds.add(killJobId);
- }
- }
- }
-
- for (String killJobId : killJobIds) {
- killList.add(new KillJobAction(killJobId));
- LOG.debug(groomName + " -> KillJobAction: " + killJobId);
- }
-
- return killList;
- }
- return null;
-
- }
-
- /**
- * Process incoming heartbeat messages from the groom.
- */
- private synchronized boolean processHeartbeat(GroomServerStatus groomStatus,
- boolean initialContact) {
- String groomName = groomStatus.getGroomName();
-
- synchronized (groomServers) {
- GroomServerStatus oldStatus = groomServers.get(groomName);
- if (oldStatus == null) {
- groomServers.put(groomName, groomStatus);
- } else { // TODO - to be improved to update status.
- }
- }
-
- if (initialContact) {
- groomServerPeers.put(groomStatus.getGroomName(), groomStatus
- .getPeerName());
- }
-
- return true;
- }
-
// //////////////////////////////////////////////////
// JobSubmissionProtocol
// //////////////////////////////////////////////////
@@ -535,23 +414,29 @@ public class BSPMaster implements JobSub
return jobs.get(jobID).getStatus();
}
- JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this, this.conf);
+ JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
+ this.conf);
return addJob(jobID, job);
}
+ // //////////////////////////////////////////////////
+ // GroomServerManager functions
+ // //////////////////////////////////////////////////
+
@Override
public ClusterStatus getClusterStatus(boolean detailed) {
- int numGroomServers;
Map<String, String> groomPeersMap = null;
// give the caller a snapshot of the cluster status
- synchronized (this) {
- numGroomServers = groomServerPeers.size();
- if (detailed) {
- groomPeersMap = new HashMap<String, String>(groomServerPeers);
+ int numGroomServers = groomServers.size();
+ if (detailed) {
+ groomPeersMap = new HashMap<String, String>();
+ for (Map.Entry<GroomServerStatus, WorkerProtocol> entry : groomServers
+ .entrySet()) {
+ GroomServerStatus s = entry.getKey();
+ groomPeersMap.put(s.getGroomName(), s.getPeerName());
}
}
-
if (detailed) {
return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
state);
@@ -561,23 +446,58 @@ public class BSPMaster implements JobSub
}
}
+ @Override
+ public WorkerProtocol findGroomServer(GroomServerStatus status) {
+ return groomServers.get(status);
+ }
+
+ @Override
+ public Collection<WorkerProtocol> findGroomServers() {
+ return groomServers.values();
+ }
+
+ @Override
+ public Collection<GroomServerStatus> groomServerStatusKeySet() {
+ return groomServers.keySet();
+ }
+
+ @Override
+ public void addJobInProgressListener(JobInProgressListener listener) {
+ jobInProgressListeners.add(listener);
+ }
+
+ @Override
+ public void removeJobInProgressListener(JobInProgressListener listener) {
+ jobInProgressListeners.remove(listener);
+ }
+
+ @Override
+ public Map<String, String> currentGroomServerPeers() {
+ Map<String, String> tmp = new HashMap<String, String>();
+ for (GroomServerStatus status : groomServers.keySet()) {
+ tmp.put(status.getGroomName(), status.getPeerName());
+ }
+ return tmp;
+ }
+
/**
* Adds a job to the bsp master. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
*
* @param jobId The id for the job submitted which needs to be added
*/
- private synchronized JobStatus addJob(BSPJobID jodId, JobInProgress job) {
+ private synchronized JobStatus addJob(BSPJobID jobId, JobInProgress job) {
totalSubmissions++;
synchronized (jobs) {
- synchronized (jobInitQueue) {
- jobs.put(job.getProfile().getJobID(), job);
- taskScheduler.addJob(job);
- jobInitQueue.add(job);
- jobInitQueue.notifyAll();
+ jobs.put(job.getProfile().getJobID(), job);
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.jobAdded(job);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter Scheduler a job is added.", ioe);
+ }
}
}
-
return job.getStatus();
}
@@ -600,11 +520,11 @@ public class BSPMaster implements JobSub
List<JobStatus> jobStatusList = new ArrayList<JobStatus>();
for (JobInProgress jip : jips) {
JobStatus status = jip.getStatus();
-
+
status.setStartTime(jip.getStartTime());
// Sets the user name
status.setUsername(jip.getProfile().getUser());
-
+
if (toComplete) {
if (status.getRunState() == JobStatus.RUNNING
|| status.getRunState() == JobStatus.PREP) {
@@ -694,26 +614,10 @@ public class BSPMaster implements JobSub
}
public void shutdown() {
- this.interServer.stop();
+ this.masterServer.stop();
}
- public void createTaskEntry(TaskAttemptID taskid, String groomServer,
- TaskInProgress taskInProgress) {
- LOG.info("Adding task '" + taskid + "' to tip " + taskInProgress.getTIPId()
- + ", for groom '" + groomServer + "'");
-
- // taskid --> groom
- taskIdToGroomNameMap.put(taskid, groomServer);
-
- // groom --> taskid
- TreeSet<TaskAttemptID> taskset = groomNameToTaskIdsMap.get(groomServer);
- if (taskset == null) {
- taskset = new TreeSet<TaskAttemptID>();
- groomNameToTaskIdsMap.put(groomServer, taskset);
- }
- taskset.add(taskid);
-
- // taskid --> TIP
- taskIdToTaskInProgressMap.put(taskid, taskInProgress);
+ public BSPMaster.State currentState() {
+ return this.state;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Wed Jan 12 12:32:39 2011
@@ -56,9 +56,12 @@ public class BSPPeer implements Watcher,
private final String bspRoot;
private final String zookeeperAddr;
- private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
- private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
- private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ private final Map<InetSocketAddress, BSPPeerInterface> peers =
+ new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+ private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues =
+ new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+ private final ConcurrentLinkedQueue<BSPMessage> localQueue =
+ new ConcurrentLinkedQueue<BSPMessage>();
private Set<String> allPeerNames = new HashSet<String>();
private InetSocketAddress peerAddress;
private TaskStatus currentTaskStatus;
@@ -68,7 +71,6 @@ public class BSPPeer implements Watcher,
*/
public BSPPeer(Configuration conf) throws IOException {
this.conf = conf;
-
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
@@ -79,6 +81,8 @@ public class BSPPeer implements Watcher,
+ ":"
+ conf.getInt(Constants.ZOOKEPER_CLIENT_PORT,
Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ // TODO: may require to dynamic reflect the underlying
+ // network e.g. ip address, port.
peerAddress = new InetSocketAddress(bindAddress, bindPort);
reinitialize();
@@ -90,6 +94,7 @@ public class BSPPeer implements Watcher,
server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
.getPort(), conf);
server.start();
+ LOG.info(" BSPPeer address:"+peerAddress.getHostName()+" port:"+peerAddress.getPort());
} catch (IOException e) {
e.printStackTrace();
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ *
+ */
+public class Directive implements Writable {
+
+ public static final Log LOG = LogFactory.getLog(Directive.class);
+
+ private long timestamp;
+ private Directive.Type type;
+ private Map<String, String> groomServerPeers;
+ private GroomServerAction[] actions;
+ private GroomServerStatus status;
+
+ public static enum Type {
+ Request(1), Response(2);
+ int t;
+
+ Type(int t) {
+ this.t = t;
+ }
+
+ public int value() {
+ return this.t;
+ }
+ };
+
+ public Directive() {
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ public Directive(Map<String, String> groomServerPeers,
+ GroomServerAction[] actions) {
+ this();
+ this.type = Directive.Type.Request;
+ this.groomServerPeers = groomServerPeers;
+ this.actions = actions;
+ }
+
+ public Directive(GroomServerStatus status) {
+ this();
+ this.type = Directive.Type.Response;
+ this.status = status;
+ }
+
+ public long getTimestamp() {
+ return this.timestamp;
+ }
+
+ public Directive.Type getType() {
+ return this.type;
+ }
+
+ public Map<String, String> getGroomServerPeers() {
+ return this.groomServerPeers;
+ }
+
+ public GroomServerAction[] getActions() {
+ return this.actions;
+ }
+
+ public GroomServerStatus getStatus() {
+ return this.status;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.timestamp);
+ out.writeInt(this.type.value());
+ if (getType().value() == Directive.Type.Request.value()) {
+ if (this.actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for (GroomServerAction action : this.actions) {
+ WritableUtils.writeEnum(out, action.getActionType());
+ action.write(out);
+ }
+ }
+ String[] groomServerNames = groomServerPeers.keySet().toArray(
+ new String[0]);
+ WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+ List<String> groomServerAddresses = new ArrayList<String>(
+ groomServerNames.length);
+ for (String groomName : groomServerNames) {
+ groomServerAddresses.add(groomServerPeers.get(groomName));
+ }
+ WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+ .toArray(new String[0]));
+ } else if (getType().value() == Directive.Type.Response.value()) {
+ this.status.write(out);
+ } else {
+ throw new IllegalStateException("Wrong directive type:" + getType());
+ }
+
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.timestamp = in.readLong();
+ int t = in.readInt();
+ if (Directive.Type.Request.value() == t) {
+ this.type = Directive.Type.Request;
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ this.actions = new GroomServerAction[length];
+ for (int i = 0; i < length; ++i) {
+ GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+ GroomServerAction.ActionType.class);
+ actions[i] = GroomServerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ this.actions = null;
+ }
+ String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+ String[] groomServerAddresses = WritableUtils
+ .readCompressedStringArray(in);
+ groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+ for (int i = 0; i < groomServerNames.length; i++) {
+ groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+ }
+ } else if (Directive.Type.Response.value() == t) {
+ this.type = Directive.Type.Response;
+ this.status = new GroomServerStatus();
+ this.status.readFields(in);
+ } else {
+ throw new IllegalStateException("Wrong directive type:" + t);
+ }
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/FCFSQueue.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+ public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+ private final String name;
+ private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+ public FCFSQueue(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return this.name;
+ }
+
+ @Override
+ public void addJob(JobInProgress job) {
+ try {
+ queue.put(job);
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+ }
+ }
+
+ @Override
+ public void removeJob(JobInProgress job) {
+ queue.remove(job);
+ }
+
+ @Override
+ public JobInProgress removeJob() {
+ try {
+ return queue.take();
+ } catch (InterruptedException ie) {
+ LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<JobInProgress> jobs() {
+ return queue;
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Jan 12 12:32:39 2011
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.RunJar;
@@ -47,13 +49,17 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.ipc.InterServerProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.WorkerProtocol;
+
+public class GroomServer implements Runnable, WorkerProtocol {
-public class GroomServer implements Runnable {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
- private static BSPPeer bspPeer;
+ private BSPPeer bspPeer;
static final String SUBDIR = "groomServer";
+ private volatile static int REPORT_INTERVAL = 60 * 1000;
+
Configuration conf;
// Constants
@@ -62,19 +68,16 @@ public class GroomServer implements Runn
};
// Running States and its related things
+ volatile boolean initialized = false;
volatile boolean running = true;
volatile boolean shuttingDown = false;
- boolean justStarted = true;
boolean justInited = true;
GroomServerStatus status = null;
- short heartbeatResponseId = -1;
- private volatile int heartbeatInterval = 3 * 1000;
// Attributes
String groomServerName;
String localHostname;
InetSocketAddress bspMasterAddr;
- InterServerProtocol jobClient;
// Filesystem
// private LocalDirAllocator localDirAllocator;
@@ -82,7 +85,6 @@ public class GroomServer implements Runn
FileSystem systemFS = null;
// Job
- boolean acceptNewTasks = true;
private int failures;
private int maxCurrentTasks = 1;
Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
@@ -90,6 +92,14 @@ public class GroomServer implements Runn
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<BSPJobID, RunningJob> runningJobs = null;
+ // new nexus between GroomServer and BSPMaster
+ // holds/ manage all tasks
+ List<TaskInProgress> tasksList = new CopyOnWriteArrayList<TaskInProgress>();
+
+ private String rpcServer;
+ private Server workerServer;
+ MasterProtocol masterClient;
+
private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
public GroomServer(Configuration conf) throws IOException {
@@ -114,7 +124,6 @@ public class GroomServer implements Runn
this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
"default"), conf.get("bsp.dns.nameserver", "default"));
}
-
// check local disk
checkLocalDirs(conf.getStrings("bsp.local.dir"));
deleteLocalFiles("groomserver");
@@ -123,20 +132,75 @@ public class GroomServer implements Runn
this.tasks.clear();
this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
- this.acceptNewTasks = true;
-
this.conf.set(Constants.PEER_HOST, localHostname);
+ this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
bspPeer = new BSPPeer(conf);
+ int rpcPort = -1;
+ String rpcAddr = null;
+ if (false == this.initialized) {
+ rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
+ Constants.DEFAULT_GROOM_RPC_HOST);
+ rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
+ Constants.DEFAULT_GROOM_RPC_PORT);
+ if (-1 == rpcPort || null == rpcAddr)
+ throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ + " port" + rpcPort);
+ this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
+ this.workerServer.start();
+ this.rpcServer = rpcAddr + ":" + rpcPort;
+ LOG.info("Worker rpc server --> " + rpcServer);
+ }
+
this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
LOG.info("Starting groom: " + this.groomServerName);
DistributedCache.purgeCache(this.conf);
- this.jobClient = (InterServerProtocol) RPC.waitForProxy(
- InterServerProtocol.class, InterServerProtocol.versionID,
- bspMasterAddr, conf);
+ // establish the communication link to bsp master
+ this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
+ MasterProtocol.versionID, bspMasterAddr, conf);
+
+ // enroll in bsp master
+ if (-1 == rpcPort || null == rpcAddr)
+ throw new IllegalArgumentException("Error rpc address " + rpcAddr
+ + " port" + rpcPort);
+ if (!this.masterClient.register(new GroomServerStatus(groomServerName,
+ bspPeer.getPeerName(), cloneAndResetRunningTaskStatuses(), failures,
+ maxCurrentTasks, this.rpcServer))) {
+ LOG.error("There is a problem in establishing communication"
+ + " link with BSPMaster");
+ throw new IOException("There is a problem in establishing"
+ + " communication link with BSPMaster.");
+ }
this.running = true;
+ this.initialized = true;
+ }
+
+ @Override
+ public void dispatch(Directive directive) throws IOException {
+ // update tasks status
+ GroomServerAction[] actions = directive.getActions();
+ bspPeer.setAllPeerNames(directive.getGroomServerPeers().values());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got Response from BSPMaster with "
+ + ((actions != null) ? actions.length : 0) + " actions");
+ }
+ // perform actions
+ if (actions != null) {
+ for (GroomServerAction action : actions) {
+ if (action instanceof LaunchTaskAction) {
+ startNewTask((LaunchTaskAction) action);
+ } else {
+ try {
+ tasksToCleanup.put(action);
+ } catch (InterruptedException e) {
+ LOG.error("Fail to move action to cleanup list.");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
private static void checkLocalDirs(String[] localDirs)
@@ -193,70 +257,19 @@ public class GroomServer implements Runn
}
public State offerService() throws Exception {
- long lastHeartbeat = 0;
while (running && !shuttingDown) {
try {
- long now = System.currentTimeMillis();
-
- long waitTime = heartbeatInterval - (now - lastHeartbeat);
- if (waitTime > 0) {
- // sleeps for the wait time
- Thread.sleep(waitTime);
- }
-
if (justInited) {
- String dir = jobClient.getSystemDir();
+ String dir = masterClient.getSystemDir();
if (dir == null) {
- throw new IOException("Failed to get system directory");
+ LOG.error("Fail to get system directory.");
+ throw new IOException("Fail to get system directory.");
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(conf);
}
-
- // Send the heartbeat and process the bspmaster's directives
- HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
-
- if (acceptNewTasks) {
- bspPeer.setAllPeerNames(heartbeatResponse.getGroomServers().values());
- }
-
- for (String peer : bspPeer.getAllPeerNames()) {
- LOG.debug("Remote peer, host:port is " + peer);
- }
-
- GroomServerAction[] actions = heartbeatResponse.getActions();
- LOG.debug("Got heartbeatResponse from BSPMaster with responseId: "
- + heartbeatResponse.getResponseId() + " and "
- + ((actions != null) ? actions.length : 0) + " actions");
-
- if (actions != null) {
- acceptNewTasks = false;
-
- for (GroomServerAction action : actions) {
- if (action instanceof LaunchTaskAction) {
- startNewTask((LaunchTaskAction) action);
- } else {
- tasksToCleanup.put(action);
- }
- }
- }
-
- //
- // The heartbeat got through successfully!
- //
- heartbeatResponseId = heartbeatResponse.getResponseId();
-
- // Note the time when the heartbeat returned, use this to decide when to
- // send the
- // next heartbeat
- lastHeartbeat = System.currentTimeMillis();
-
- justStarted = false;
justInited = false;
- } catch (InterruptedException ie) {
- LOG.info("Interrupted. Closing down.");
- return State.INTERRUPTED;
} catch (DiskErrorException de) {
String msg = "Exiting groom server for disk error:\n"
+ StringUtils.stringifyException(de);
@@ -271,7 +284,6 @@ public class GroomServer implements Runn
LOG.error(msg);
}
}
-
return State.NORMAL;
}
@@ -296,7 +308,6 @@ public class GroomServer implements Runn
Task task = tip.getTask();
conf.addResource(task.getJobFile());
BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
-
Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ task.getTaskID() + "/" + "job.xml");
@@ -392,43 +403,6 @@ public class GroomServer implements Runn
}
}
- private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
- //
- // Check if the last heartbeat got through...
- // if so then build the heartbeat information for the BSPMaster;
- // else resend the previous status information.
- //
- if (status == null) {
- synchronized (this) {
- status = new GroomServerStatus(groomServerName, bspPeer.getPeerName(),
- cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
- }
- } else {
- LOG.info("Resending 'status' to '" + bspMasterAddr.getHostName()
- + "' with reponseId '" + heartbeatResponseId + "'");
- }
-
- // TODO - Later, acceptNewTask is to be set by the status of groom server.
- HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
- justStarted, justInited, acceptNewTasks, heartbeatResponseId, status
- .getTaskReports().size());
-
- synchronized (this) {
- for (TaskStatus taskStatus : status.getTaskReports()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- LOG.debug("Removing task from runningTasks: "
- + taskStatus.getTaskId());
- runningTasks.remove(taskStatus.getTaskId());
- }
- }
- }
-
- // Force a rebuild of 'status' on the next iteration
- status = null;
-
- return heartbeatResponse;
- }
-
private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
for (TaskInProgress tip : runningTasks.values()) {
@@ -469,7 +443,6 @@ public class GroomServer implements Runn
} finally {
// close();
}
-
if (shuttingDown) {
return;
}
@@ -490,11 +463,11 @@ public class GroomServer implements Runn
public synchronized void close() throws IOException {
this.running = false;
+ this.initialized = false;
bspPeer.close();
cleanupStorage();
-
- // shutdown RPC connections
- RPC.stopProxy(jobClient);
+ this.workerServer.stop();
+ RPC.stopProxy(masterClient);
}
public static Thread startGroomServer(final GroomServer hrs) {
@@ -523,7 +496,7 @@ public class GroomServer implements Runn
public TaskInProgress(Task task, String groomServer) {
this.task = task;
- this.taskStatus = new TaskStatus(task.getTaskID(), 0,
+ this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
TaskStatus.State.UNASSIGNED, "running", groomServer,
TaskStatus.Phase.STARTING);
}
@@ -539,7 +512,7 @@ public class GroomServer implements Runn
this.runner = task.createRunner(bspPeer, this.jobConf);
this.runner.start();
- // Check state of Task
+ // Check state of a Task
while (true) {
try {
Thread.sleep(1000);
@@ -550,7 +523,7 @@ public class GroomServer implements Runn
if (bspPeer.getLocalQueueSize() == 0
&& bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive()) {
taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
- acceptNewTasks = true;
+ doReport();
break;
}
}
@@ -558,11 +531,42 @@ public class GroomServer implements Runn
}
/**
+ * Update and report refresh status back to BSPMaster.
+ */
+ private void doReport() {
+ GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
+ .getPeerName(), updateTaskStatus(), failures, maxCurrentTasks,
+ rpcServer);
+ try {
+ boolean ret = masterClient.report(new Directive(gss));
+ if (!ret) {
+ LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+ + " groom name: " + gss.getGroomName() + " peer name:"
+ + gss.getPeerName() + " rpc server:" + rpcServer);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
+ }
+ }
+
+ private List<TaskStatus> updateTaskStatus() {
+ List<TaskStatus> tlist = new ArrayList<TaskStatus>();
+ for (TaskInProgress tip : runningTasks.values()) {
+ TaskStatus stus = tip.getStatus();
+ stus.setProgress(1f);
+ stus.setRunState(TaskStatus.State.SUCCEEDED);
+ stus.setPhase(TaskStatus.Phase.CLEANUP);
+ tlist.add((TaskStatus) stus.clone());
+ }
+ return tlist;
+ }
+
+ /**
* This task has run on too long, and should be killed.
*/
public synchronized void killAndCleanup(boolean wasFailure)
throws IOException {
- // TODO
+ // TODO
runner.kill();
}
@@ -616,4 +620,26 @@ public class GroomServer implements Runn
+ groomServerClass.toString(), e);
}
}
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(WorkerProtocol.class.getName())) {
+ return WorkerProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to GroomServer: " + protocol);
+ }
+ }
+
+ /**
+ * GroomServer address information.
+ *
+ * @return bsp peer information in the form of "address:port".
+ */
+ public String getBspPeerName() {
+ if (null != this.bspPeer)
+ return this.bspPeer.getPeerName();
+ return null;
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java Wed Jan 12 12:32:39 2011
@@ -17,11 +17,14 @@
*/
package org.apache.hama.bsp;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hama.ipc.WorkerProtocol;
/**
- * Manages information about the {@link GroomServer}s running on a cluster.
- * This interface exits primarily to test the {@link BSPMaster}, and is not
- * intended to be implemented by users.
+ * Manages information about the {@link GroomServer}s in the cluster
+ * environment. This interface is not intended to be implemented by users.
*/
interface GroomServerManager {
@@ -30,6 +33,47 @@ interface GroomServerManager {
* @param detailed if true then report groom names as well
* @return summary of the state of the cluster
*/
- public ClusterStatus getClusterStatus(boolean detailed);
-
+ ClusterStatus getClusterStatus(boolean detailed);
+
+ /**
+ * Find WorkerProtocol with corresponded groom server status
+ *
+ * @param groomId The identification value of GroomServer
+ * @return GroomServerStatus
+ */
+ WorkerProtocol findGroomServer(GroomServerStatus status);
+
+ /**
+ * Find the collection of groom servers.
+ *
+ * @return Collection of groom servers list.
+ */
+ Collection<WorkerProtocol> findGroomServers();
+
+ /**
+ * Collection of GroomServerStatus as the key set.
+ *
+ * @return Collection of GroomServerStatus.
+ */
+ Collection<GroomServerStatus> groomServerStatusKeySet();
+
+ /**
+ * Registers a JobInProgressListener to GroomServerManager. Therefore,
+ * adding a JobInProgress will trigger the jobAdded function.
+ * @param the JobInProgressListener listener to be added.
+ */
+ void addJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Unregisters a JobInProgressListener to GroomServerManager. Therefore,
+ * the remove of a JobInProgress will trigger the jobRemoved action.
+ * @param the JobInProgressListener to be removed.
+ */
+ void removeJobInProgressListener(JobInProgressListener listener);
+
+ /**
+ * Current GroomServer Peers.
+ * @return GroomName and PeerName(host:port) in pair.
+ */
+ Map<String, String> currentGroomServerPeers();
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Wed Jan 12 12:32:39 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@ public class GroomServerStatus implement
String groomName;
String peerName;
+ String rpcServer;
int failures;
List<TaskStatus> taskReports;
@@ -54,18 +56,25 @@ public class GroomServerStatus implement
private int maxTasks;
public GroomServerStatus() {
- taskReports = new ArrayList<TaskStatus>();
+ //taskReports = new ArrayList<TaskStatus>();
+ taskReports = new CopyOnWriteArrayList<TaskStatus>();
}
public GroomServerStatus(String groomName, String peerName,
List<TaskStatus> taskReports, int failures, int maxTasks) {
+ this(groomName, peerName, taskReports, failures, maxTasks, "");
+ }
+
+ public GroomServerStatus(String groomName, String peerName,
+ List<TaskStatus> taskReports, int failures, int maxTasks, String rpc) {
this.groomName = groomName;
this.peerName = peerName;
this.taskReports = new ArrayList<TaskStatus>(taskReports);
this.failures = failures;
this.maxTasks = maxTasks;
- }
-
+ this.rpcServer = rpc;
+ }
+
public String getGroomName() {
return groomName;
}
@@ -73,11 +82,15 @@ public class GroomServerStatus implement
/**
* The host (and port) from where the groom server can be reached.
*
- * @return The groom server address in the format hostname:port
+ * @return The groom server address in the form of "hostname:port"
*/
public String getPeerName() {
return peerName;
}
+
+ public String getRpcServer(){
+ return rpcServer;
+ }
/**
* Get the current tasks at the GroomServer.
@@ -116,12 +129,57 @@ public class GroomServerStatus implement
TaskStatus.State state = ts.getRunState();
if(state == TaskStatus.State.RUNNING ||
state == TaskStatus.State.UNASSIGNED) {
- taskCount++;
+ taskCount++;
}
}
return taskCount;
}
+
+ /**
+ * For BSPMaster to distinguish between
+ * different GroomServers, because
+ * BSPMaster stores using GroomServerStatus
+ * as key.
+ */
+ @Override
+ public int hashCode(){
+ int result = 17;
+ result = 37*result + groomName.hashCode();
+ result = 37*result + peerName.hashCode();
+ result = 37*result + rpcServer.hashCode();
+ /*
+ result = 37*result + (int)failures;
+ result = 37*result + taskReports.hashCode();
+ result = 37*result + (int)(lastSeen^(lastSeen>>>32));
+ result = 37*result + (int)maxTasks;
+ */
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if (o == this) return true;
+ if (null == o) return false;
+ if (getClass() != o.getClass()) return false;
+
+ GroomServerStatus s = (GroomServerStatus) o;
+ if(!s.groomName.equals(groomName)) return false;
+ if(!s.peerName.equals(peerName)) return false;
+ if(!s.rpcServer.equals(rpcServer)) return false;
+ /*
+ if(s.failures != failures) return false;
+ if(null == s.taskReports){
+ if(null != s.taskReports)
+ return false;
+ }else if(!s.taskReports.equals(taskReports)){
+ return false;
+ }
+ if(s.lastSeen != lastSeen) return false;
+ if(s.maxTasks != maxTasks) return false;
+ */
+ return true;
+ }
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
@@ -130,6 +188,7 @@ public class GroomServerStatus implement
public void readFields(DataInput in) throws IOException {
this.groomName = Text.readString(in);
this.peerName = Text.readString(in);
+ this.rpcServer = Text.readString(in);
this.failures = in.readInt();
this.maxTasks = in.readInt();
taskReports.clear();
@@ -150,6 +209,7 @@ public class GroomServerStatus implement
public void write(DataOutput out) throws IOException {
Text.writeString(out, groomName);
Text.writeString(out, peerName);
+ Text.writeString(out, rpcServer);
out.writeInt(failures);
out.writeInt(maxTasks);
out.writeInt(taskReports.size());
@@ -161,5 +221,4 @@ public class GroomServerStatus implement
public Iterator<TaskStatus> taskReports() {
return taskReports.iterator();
}
-
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Wed Jan 12 12:32:39 2011
@@ -45,6 +45,7 @@ class JobInProgress {
static final Log LOG = LogFactory.getLog(JobInProgress.class);
boolean tasksInited = false;
+ boolean jobInited = false;
Configuration conf;
JobProfile profile;
@@ -76,7 +77,7 @@ class JobInProgress {
this.localFs = FileSystem.getLocal(conf);
this.jobFile = jobFile;
this.master = master;
- this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.PREP);
+ this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.State.PREP.value());
this.startTime = System.currentTimeMillis();
this.superstepCounter = 0;
this.restartCount = 0;
@@ -139,6 +140,21 @@ class JobInProgress {
return jobId;
}
+ public synchronized TaskInProgress findTaskInProgress(TaskID id){
+ if(areTasksInited()){
+ for(TaskInProgress tip: tasks){
+ if(tip.getTaskId().equals(id)){
+ return tip;
+ }
+ }
+ }
+ return null;
+ }
+
+ public synchronized boolean areTasksInited(){
+ return this.tasksInited;
+ }
+
public String toString() {
return "jobName:" + profile.getJobName() + "\n" + "submit user:"
+ profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
@@ -154,7 +170,9 @@ class JobInProgress {
return;
}
- LOG.debug("numBSPTasks: " + numBSPTasks);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("numBSPTasks: " + numBSPTasks);
+ }
// adjust number of map tasks to actual number of splits
this.tasks = new TaskInProgress[numBSPTasks];
@@ -192,7 +210,6 @@ class JobInProgress {
} catch (IOException e) {
e.printStackTrace();
}
-
return result;
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+ /**
+ * Invoked when a new job has been added to the {@link BSPMaster}.
+ * @param job The job to be added.
+ * @throws IOException
+ */
+ public abstract void jobAdded(JobInProgress job) throws IOException;
+
+ /**
+ * Invoked when a job has been removed from the {@link BSPMaster}.
+ * @param job The job to be removed .
+ * @throws IOException
+ */
+ public abstract void jobRemoved(JobInProgress job) throws IOException;
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Wed Jan 12 12:32:39 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.WritableFact
import org.apache.hadoop.io.WritableFactory;
public class JobStatus implements Writable, Cloneable {
+
static {
WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
public Writable newInstance() {
@@ -35,6 +36,21 @@ public class JobStatus implements Writab
});
}
+ public static enum State{
+ RUNNING(1),
+ SUCCEEDED(2),
+ FAILED(3),
+ PREP(4),
+ KILLED(5);
+ int s;
+ State(int s){
+ this.s = s;
+ }
+ public int value(){
+ return this.s;
+ }
+ }
+
public static final int RUNNING = 1;
public static final int SUCCEEDED = 2;
public static final int FAILED = 3;
@@ -45,6 +61,7 @@ public class JobStatus implements Writab
private float progress;
private float cleanupProgress;
private float setupProgress;
+ private volatile State state;// runState in enum
private int runState;
private long startTime;
private String schedulingInfo = "NA";
@@ -77,6 +94,7 @@ public class JobStatus implements Writab
this.progress = progress;
this.cleanupProgress = cleanupProgress;
this.runState = runState;
+ this.state = State.values()[runState-1];
this.superstepCount = superstepCount;
this.user = user;
}
@@ -109,6 +127,14 @@ public class JobStatus implements Writab
this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
}
+ public JobStatus.State getState(){
+ return this.state;
+ }
+
+ public void setState(JobStatus.State state){
+ this.state = state;
+ }
+
public synchronized int getRunState() {
return runState;
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Queue.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+
+public interface Queue<T>{
+
+ /**
+ * The queue name.
+ * @return the name of current queue.
+ */
+ String getName();
+
+ /**
+ * Add a job to a queue.
+ * @param job to be added to the queue.
+ */
+ void addJob(T job);
+
+ /**
+ * Remove a job from the queue.
+ * @param job to be removed from the queue.
+ */
+ void removeJob(T job);
+
+ /**
+ * Get a job
+ * @return job that is removed from the queue.
+ */
+ T removeJob();
+
+ /**
+ * Return all data stored in this queue.
+ * @return Collection of jobs.
+ */
+ public Collection<T> jobs();
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/QueueManager.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class QueueManager{
+
+ private ConcurrentMap<String, Queue<JobInProgress>> queues =
+ new ConcurrentHashMap<String, Queue<JobInProgress>>();
+
+ public QueueManager(Configuration conf){ }
+
+ /**
+ * Initialize a job.
+ * @param job required initialzied.
+ */
+ public void initJob(JobInProgress job){
+ try{
+ //job.updateStatus();
+ job.initTasks();
+ }catch(IOException ioe){
+ ioe.printStackTrace();
+ }
+ }
+
+ /**
+ * Add a job to the specified queue.
+ * @param name of the queue.
+ * @param job to be added.
+ */
+ public void addJob(String name, JobInProgress job){
+ Queue<JobInProgress> queue = queues.get(name);
+ if(null != queue) queue.addJob(job);
+ }
+
+ /**
+ * Remove a job from the head of a designated queue.
+ * @param name from which a job is removed.
+ * @param job to be removed from the queue.
+ */
+ public void removeJob(String name, JobInProgress job){
+ Queue<JobInProgress> queue = queues.get(name);
+ if(null != queue) queue.removeJob(job);
+ }
+
+ /**
+ * Move a job from a queue to another.
+ * @param from a queue a job is to be removed.
+ * @param to a queue a job is to be added.
+ */
+ public void moveJob(String from, String to, JobInProgress job){
+ synchronized(queues){
+ removeJob(from, job);
+ addJob(to, job);
+ }
+ }
+
+ /**
+ * Create a FCFS queue with the name provided.
+ * @param name of the queue.
+ */
+ public void createFCFSQueue(String name){
+ queues.putIfAbsent(name, new FCFSQueue(name));
+ }
+
+ /**
+ * Find Queue according to the name specified.
+ * @param name of the queue.
+ * @return queue of JobInProgress
+ */
+ public Queue<JobInProgress> findQueue(String name){
+ return queues.get(name);
+ }
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Schedulable.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * This is the class that schedules commands to GroomServer(s)
+ */
+public interface Schedulable{
+
+ /**
+ * Schedule job to designated GroomServer(s) immediately.
+ * @param job to be scheduled.
+ * @param statuses of GroomServer(s).
+ * @throws IOException
+ */
+ void schedule(JobInProgress job, GroomServerStatus... statuses)
+ throws IOException;
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Wed Jan 12 12:32:39 2011
@@ -17,83 +17,163 @@
*/
package org.apache.hama.bsp;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.WorkerProtocol;
class SimpleTaskScheduler extends TaskScheduler {
+
private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
- List<JobInProgress> jobQueue;
- public SimpleTaskScheduler() {
- jobQueue = new ArrayList<JobInProgress>();
+ public static final String WAIT_QUEUE = "waitQueue";
+ public static final String PROCESSING_QUEUE = "processingQueue";
+ public static final String FINISHED_QUEUE = "finishedQueue";
+
+ private QueueManager queueManager;
+ private volatile boolean initialized;
+ private JobListener jobListener;
+ private JobProcessor jobProcessor;
+
+ private class JobListener extends JobInProgressListener {
+ @Override
+ public void jobAdded(JobInProgress job) throws IOException {
+ queueManager.initJob(job); // init task
+ queueManager.addJob(WAIT_QUEUE, job);
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) throws IOException {
+ // queueManager.removeJob(WAIT_QUEUE, job);
+ queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+ }
}
- @Override
- public void addJob(JobInProgress job) {
- LOG.debug("Added a job (" + job + ") to scheduler (remaining jobs: "
- + (jobQueue.size() + 1) + ")");
- jobQueue.add(job);
+ private class JobProcessor extends Thread implements Schedulable {
+ JobProcessor() {
+ super("JobProcess");
+ }
+
+ /**
+ * Main logic scheduling task to GroomServer(s). Also, it will move
+ * JobInProgress from Wait Queue to Processing Queue.
+ */
+ public void run() {
+ if (false == initialized) {
+ throw new IllegalStateException("SimpleTaskScheduler initialization"
+ + " is not yet finished!");
+ }
+ while (initialized) {
+ Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+ if (null == queue) {
+ LOG.error(WAIT_QUEUE + " does not exist.");
+ throw new NullPointerException(WAIT_QUEUE + " does not exist.");
+ }
+ // move a job from the wait queue to the processing queue
+ JobInProgress j = queue.removeJob();
+ queueManager.addJob(PROCESSING_QUEUE, j);
+ // schedule
+ Collection<GroomServerStatus> glist = groomServerManager
+ .groomServerStatusKeySet();
+ schedule(j, (GroomServerStatus[]) glist
+ .toArray(new GroomServerStatus[glist.size()]));
+ }
+ }
+
+ /**
+ * Schedule job to designated GroomServer(s) immediately.
+ *
+ * @param Targeted GroomServer(s).
+ * @param Job to be scheduled.
+ */
+ @Override
+ public void schedule(JobInProgress job, GroomServerStatus... statuses) {
+ ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+ final int numGroomServers = clusterStatus.getGroomServers();
+ final ScheduledExecutorService sched = Executors
+ .newScheduledThreadPool(statuses.length + 5);
+ for (GroomServerStatus status : statuses) {
+ sched
+ .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
+ }// for
+ }
}
- // removes job
- public void removeJob(JobInProgress job) {
- jobQueue.remove(job);
+ private class TaskWorker implements Runnable {
+ private final GroomServerStatus stus;
+ private final int groomNum;
+ private final JobInProgress jip;
+
+ TaskWorker(final GroomServerStatus stus, final int num,
+ final JobInProgress jip) {
+ this.stus = stus;
+ this.groomNum = num;
+ this.jip = jip;
+ if (null == this.stus)
+ throw new NullPointerException("Target groom server is not "
+ + "specified.");
+ if (-1 == this.groomNum)
+ throw new IllegalArgumentException("Groom number is not specified.");
+ if (null == this.jip)
+ throw new NullPointerException("No job is specified.");
+ }
+
+ public void run() {
+ // obtain tasks
+ Task t = jip.obtainNewTask(this.stus, groomNum);
+ // assembly into actions
+ // List<Task> tasks = new ArrayList<Task>();
+ if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+ WorkerProtocol worker = groomServerManager.findGroomServer(this.stus);
+ try {
+ // dispatch() to the groom server
+ Directive d1 = new Directive(groomServerManager
+ .currentGroomServerPeers(),
+ new GroomServerAction[] { new LaunchTaskAction(t) });
+ worker.dispatch(d1);
+ } catch (IOException ioe) {
+ LOG.error("Fail to dispatch tasks to GroomServer "
+ + this.stus.getGroomName(), ioe);
+ }
+ } else {
+ LOG.warn("Currently master only shcedules job in running state. "
+ + "This may be refined in the future. JobId:" + jip.getJobID());
+ }
+ }
}
- @Override
- public Collection<JobInProgress> getJobs() {
- return jobQueue;
+ public SimpleTaskScheduler() {
+ this.jobListener = new JobListener();
+ this.jobProcessor = new JobProcessor();
}
- /*
- * (non-Javadoc)
- * @seeorg.apache.hama.bsp.TaskScheduler#assignTasks(org.apache.hama.bsp.
- * GroomServerStatus)
- */
@Override
- public synchronized List<Task> assignTasks(GroomServerStatus groomStatus)
- throws IOException {
- ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
-
- final int numGroomServers = clusterStatus.getGroomServers();
- // final int clusterTaskCapacity = clusterStatus.getMaxTasks();
-
- // Get task counts for the current groom.
- // final int groomTaskCapacity = groom.getMaxTasks();
- final int groomRunningTasks = groomStatus.countTasks();
-
- // Assigned tasks
- List<Task> assignedTasks = new ArrayList<Task>();
-
- if (groomRunningTasks == 0) {
- // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress
- // instance to the scheduler.
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
-
- Task t = null;
- t = job.obtainNewTask(groomStatus, numGroomServers);
-
- if (t != null) {
- assignedTasks.add(t);
- break; // TODO - Now, simple scheduler assigns only one task to
- // each groom. Later, it will be improved for scheduler to
- // assign one or more tasks to each groom according to
- // its capacity.
- }
- }
+ public void start() {
+ this.queueManager = new QueueManager(getConf()); // TODO: need factory?
+ this.queueManager.createFCFSQueue(WAIT_QUEUE);
+ this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
+ this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+ groomServerManager.addJobInProgressListener(this.jobListener);
+ this.initialized = true;
+ this.jobProcessor.start();
+ }
- }
- }
+ @Override
+ public void terminate() {
+ this.initialized = false;
+ if (null != this.jobListener)
+ groomServerManager.removeJobInProgressListener(this.jobListener);
+ }
- return assignedTasks;
+ @Override
+ public Collection<JobInProgress> getJobs(String queue) {
+ return (queueManager.findQueue(queue)).jobs();
+ // return jobQueue;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Jan 12 12:32:39 2011
@@ -62,6 +62,10 @@ public abstract class Task implements Wr
public String getJobFile() {
return jobFile;
}
+
+ public TaskAttemptID getTaskAttemptId(){
+ return this.taskId;
+ }
public TaskAttemptID getTaskID() {
return taskId;