You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/20 03:48:36 UTC
svn commit: r1024485 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Wed Oct 20 01:48:36 2010
New Revision: 1024485
URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
Log:
Refactoring launchTask() method in GroomServer
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
HAMA-311: Add unit tests for IPC package (edwardyoon)
HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
@@ -54,7 +54,7 @@ public class PiEstimator {
}
}
- byte[] tagName = Bytes.toBytes(getName().toString());
+ byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
BSPMessage estimate = new BSPMessage(tagName, myData);
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
@@ -20,20 +20,5 @@ package org.apache.hama.bsp;
/**
* This class provides an abstract implementation of the BSP interface
*/
-public abstract class BSP extends Thread implements BSPInterface {
- private BSPPeer bspPeer;
-
- /**
- * A thread's run method.
- *
- * The run method performs the
- * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
- */
- public void runBSP() throws Exception {
- bsp(bspPeer);
- }
-
- public void setPeer(BSPPeer bspServer) {
- this.bspPeer = bspServer;
- }
+public abstract class BSP implements BSPInterface {
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
@@ -30,6 +30,12 @@ public class BSPMessage implements Writa
public BSPMessage() {
}
+ /**
+ * Constructor
+ *
+ * @param tag of data
+ * @param data of message
+ */
public BSPMessage(byte[] tag, byte[] data) {
this.tag = new byte[tag.length];
this.data = new byte[data.length];
@@ -37,11 +43,20 @@ public class BSPMessage implements Writa
System.arraycopy(data, 0, this.data, 0, data.length);
}
+ /**
+ * BSP messages are typically identified with tags. This allows to get the tag
+ * of data.
+ *
+ * @return tag of data of BSP message
+ */
public byte[] getTag() {
byte[] result = this.tag;
return result;
}
+ /**
+ * @return data of BSP message
+ */
public byte[] getData() {
byte[] result = this.data;
return result;
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
@@ -17,26 +17,21 @@
*/
package org.apache.hama.bsp;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-
public class BSPTask extends Task {
- private BSP bsp;
- private Configuration conf;
-
- public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
+
+ public BSPTask() {
+ }
+
+ public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskid;
this.partition = partition;
- this.conf = conf;
}
- public BSP getBSPClass() {
- bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
- BSP.class), conf);
-
- return bsp;
+ @Override
+ public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
+ return new BSPTaskRunner(this, bspPeer, conf);
}
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.KeeperException;
+
+public class BSPTaskRunner extends Thread {
+
+ public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
+ private Task task;
+ private BSPJob conf;
+ private BSPPeer bspPeer;
+
+ public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
+ this.task = bspTask;
+ this.conf = conf;
+ this.bspPeer = bspPeer;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void run() {
+ BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
+ "bsp.work.class", BSP.class), conf.getConf());
+
+ try {
+ bsp.bsp(bspPeer);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
@@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hama.Constants;
@@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
public class GroomServer implements Runnable {
public static final Log LOG = LogFactory.getLog(GroomServer.class);
private static BSPPeer bspPeer;
+ static final String SUBDIR = "groomServer";
Configuration conf;
@@ -281,9 +284,111 @@ public class GroomServer implements Runn
}
try {
+ localizeJob(tip);
+ } catch (Throwable e) {
+ String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+ .stringifyException(e));
+ LOG.warn(msg);
+ }
+ }
+
+ private void localizeJob(TaskInProgress tip) throws IOException {
+ Task task = tip.getTask();
+ conf.addResource(task.getJobFile());
+ BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
+
+ Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.xml");
+
+ RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
+ BSPJob jobConf = null;
+
+ synchronized (rjob) {
+ if (!rjob.localized) {
+ Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+ + task.getTaskID() + "/" + "job.jar");
+ systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+ Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
+
+ HamaConfiguration conf = new HamaConfiguration();
+ conf.addResource(localJobFile);
+ jobConf = new BSPJob(conf, task.getJobID().toString());
+ jobConf.setJar(localJarFile.toString());
+
+ if (jarFile != null) {
+ systemFS.copyToLocalFile(jarFile, localJarFile);
+
+ // also unjar the job.jar files in workdir
+ File workDir = new File(
+ new File(localJobFile.toString()).getParent(), "work");
+ if (!workDir.mkdirs()) {
+ if (!workDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ }
+ RunJar.unJar(new File(localJarFile.toString()), workDir);
+ }
+ rjob.localized = true;
+ }
+ }
+ launchTaskForJob(tip, jobConf);
+ }
+
+ private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+ try {
+ tip.setJobConf(jobConf);
tip.launchTask();
} catch (Throwable ie) {
- // TODO: when job failed.
+ tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+ String error = StringUtils.stringifyException(ie);
+ LOG.info(error);
+ }
+ }
+
+ private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
+ TaskInProgress tip) {
+ synchronized (runningJobs) {
+ RunningJob rJob = null;
+ if (!runningJobs.containsKey(jobId)) {
+ rJob = new RunningJob(jobId, localJobFile);
+ rJob.localized = false;
+ rJob.tasks = new HashSet<TaskInProgress>();
+ rJob.jobFile = localJobFile;
+ runningJobs.put(jobId, rJob);
+ } else {
+ rJob = runningJobs.get(jobId);
+ }
+ rJob.tasks.add(tip);
+ return rJob;
+ }
+ }
+
+ /**
+ * The datastructure for initializing a job
+ */
+ static class RunningJob {
+ private BSPJobID jobid;
+ private Path jobFile;
+ // keep this for later use
+ Set<TaskInProgress> tasks;
+ boolean localized;
+ boolean keepJobFiles;
+
+ RunningJob(BSPJobID jobid, Path jobFile) {
+ this.jobid = jobid;
+ localized = false;
+ tasks = new HashSet<TaskInProgress>();
+ this.jobFile = jobFile;
+ keepJobFiles = false;
+ }
+
+ Path getJobFile() {
+ return jobFile;
+ }
+
+ BSPJobID getJobId() {
+ return jobid;
}
}
@@ -410,6 +515,8 @@ public class GroomServer implements Runn
// /////////////////////////////////////////////////////
class TaskInProgress {
Task task;
+ BSPJob jobConf;
+ private BSPTaskRunner runner;
volatile boolean done = false;
volatile boolean wasKilled = false;
private TaskStatus taskStatus;
@@ -421,61 +528,29 @@ public class GroomServer implements Runn
TaskStatus.Phase.STARTING);
}
- static final String SUBDIR = "groomServer";
+ public void setJobConf(BSPJob jobConf) {
+ this.jobConf = jobConf;
+ }
- public void launchTask() {
+ public void launchTask() throws IOException {
taskStatus.setRunState(TaskStatus.State.RUNNING);
+ this.runner = task.createRunner(bspPeer, this.jobConf);
+ this.runner.start();
- try {
- // TODO: need to move this code to TaskRunner
-
- task.getJobFile();
- conf.addResource(task.getJobFile());
- BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
-
- Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
- + task.getTaskID() + "/" + "job.xml");
- Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
- + task.getTaskID() + "/" + "job.jar");
-
- systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
- systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
- ".jar")), localJarFile);
-
- HamaConfiguration conf = new HamaConfiguration();
- conf.addResource(localJobFile);
- BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
- jobConf.setJar(localJarFile.toString());
-
- BSP bsp = (BSP) ReflectionUtils
- .newInstance(jobConf.getBspClass(), conf);
- bsp.setPeer(bspPeer);
+ // Check state of Task
+ while (true) {
try {
- bsp.runBSP();
- } catch (Exception e) {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
e.printStackTrace();
- taskStatus.setRunState(TaskStatus.State.FAILED);
}
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
-
- while (true) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- // If local/outgoing queues are empty, task is done.
- if (bspPeer.localQueue.size() == 0
- && bspPeer.outgoingQueues.size() == 0) {
- taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
- acceptNewTasks = true;
- break;
- }
+ // If local/outgoing queues are empty, task is done.
+ if (bspPeer.localQueue.size() == 0
+ && bspPeer.outgoingQueues.size() == 0) {
+ taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ acceptNewTasks = true;
+ break;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
@@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
}
public void readFields(DataInput in) throws IOException {
- task = new Task();
+ task = new BSPTask();
task.readFields(in);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
@@ -172,7 +172,7 @@ public class LocalJobRunner implements J
try {
GroomServer servers = new GroomServer(conf);
- Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
+ Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
// TODO not yet implemented
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
@@ -23,14 +23,12 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-/**
- *
- */
-public class Task implements Writable {
+public abstract class Task implements Writable {
public static final Log LOG = LogFactory.getLog(Task.class);
////////////////////////////////////////////
// Fields
@@ -109,5 +107,7 @@ public class Task implements Writable {
taskId = Text.readString(in);
partition = in.readInt();
}
+
+ public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
@@ -101,7 +101,7 @@ class TaskInProgress {
return null;
}
- t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
+ t = new BSPTask(jobId, jobFile, taskid, partition);
activeTasks.put(taskid, status.getGroomName());
// Ask JobTracker to note that the task exists
Re: svn commit: r1024485 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Posted by Filipe David Manana <fd...@apache.org>.
On Wed, Oct 20, 2010 at 10:41 AM, Edward J. Yoon <ed...@apache.org> wrote:
> Thanks.
>
> BTW, getAddress() and getHostName() methods are somewhat duplicated.
> If we remove one of the two, we have to change all code, related with
> it.
>
> https://issues.apache.org/jira/browse/HAMA-316
>
> Could you please comment here?
Done :)
>
> On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana
> <fd...@apache.org> wrote:
>> A big +1 on this one :)
>>
>> On Wed, Oct 20, 2010 at 2:48 AM, <ed...@apache.org> wrote:
>>> Author: edwardyoon
>>> Date: Wed Oct 20 01:48:36 2010
>>> New Revision: 1024485
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
>>> Log:
>>> Refactoring launchTask() method in GroomServer
>>>
>>> Added:
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> Modified:
>>> incubator/hama/trunk/CHANGES.txt
>>> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>> incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>>
>>> Modified: incubator/hama/trunk/CHANGES.txt
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/CHANGES.txt (original)
>>> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
>>> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>>>
>>> IMPROVEMENTS
>>>
>>> + HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>>> HAMA-311: Add unit tests for IPC package (edwardyoon)
>>> HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>>> HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>>>
>>> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
>>> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
>>> @@ -54,7 +54,7 @@ public class PiEstimator {
>>> }
>>> }
>>>
>>> - byte[] tagName = Bytes.toBytes(getName().toString());
>>> + byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>>> byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>>> BSPMessage estimate = new BSPMessage(tagName, myData);
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
>>> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>>> /**
>>> * This class provides an abstract implementation of the BSP interface
>>> */
>>> -public abstract class BSP extends Thread implements BSPInterface {
>>> - private BSPPeer bspPeer;
>>> -
>>> - /**
>>> - * A thread's run method.
>>> - *
>>> - * The run method performs the
>>> - * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
>>> - */
>>> - public void runBSP() throws Exception {
>>> - bsp(bspPeer);
>>> - }
>>> -
>>> - public void setPeer(BSPPeer bspServer) {
>>> - this.bspPeer = bspServer;
>>> - }
>>> +public abstract class BSP implements BSPInterface {
>>> }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
>>> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>>> public BSPMessage() {
>>> }
>>>
>>> + /**
>>> + * Constructor
>>> + *
>>> + * @param tag of data
>>> + * @param data of message
>>> + */
>>> public BSPMessage(byte[] tag, byte[] data) {
>>> this.tag = new byte[tag.length];
>>> this.data = new byte[data.length];
>>> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>>> System.arraycopy(data, 0, this.data, 0, data.length);
>>> }
>>>
>>> + /**
>>> + * BSP messages are typically identified with tags. This allows to get the tag
>>> + * of data.
>>> + *
>>> + * @return tag of data of BSP message
>>> + */
>>> public byte[] getTag() {
>>> byte[] result = this.tag;
>>> return result;
>>> }
>>>
>>> + /**
>>> + * @return data of BSP message
>>> + */
>>> public byte[] getData() {
>>> byte[] result = this.data;
>>> return result;
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
>>> @@ -17,26 +17,21 @@
>>> */
>>> package org.apache.hama.bsp;
>>>
>>> -import org.apache.hadoop.conf.Configuration;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> -
>>> public class BSPTask extends Task {
>>> - private BSP bsp;
>>> - private Configuration conf;
>>> -
>>> - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
>>> +
>>> + public BSPTask() {
>>> + }
>>> +
>>> + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>>> this.jobId = jobId;
>>> this.jobFile = jobFile;
>>> this.taskId = taskid;
>>> this.partition = partition;
>>> - this.conf = conf;
>>> }
>>>
>>> - public BSP getBSPClass() {
>>> - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
>>> - BSP.class), conf);
>>> -
>>> - return bsp;
>>> + @Override
>>> + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
>>> + return new BSPTaskRunner(this, bspPeer, conf);
>>> }
>>>
>>> }
>>>
>>> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
>>> @@ -0,0 +1,62 @@
>>> +/**
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements. See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership. The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License. You may obtain a copy of the License at
>>> + *
>>> + * http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +package org.apache.hama.bsp;
>>> +
>>> +import java.io.IOException;
>>> +
>>> +import org.apache.commons.logging.Log;
>>> +import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.zookeeper.KeeperException;
>>> +
>>> +public class BSPTaskRunner extends Thread {
>>> +
>>> + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
>>> + private Task task;
>>> + private BSPJob conf;
>>> + private BSPPeer bspPeer;
>>> +
>>> + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
>>> + this.task = bspTask;
>>> + this.conf = conf;
>>> + this.bspPeer = bspPeer;
>>> + }
>>> +
>>> + public Task getTask() {
>>> + return task;
>>> + }
>>> +
>>> + public void run() {
>>> + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
>>> + "bsp.work.class", BSP.class), conf.getConf());
>>> +
>>> + try {
>>> + bsp.bsp(bspPeer);
>>> + } catch (IOException e) {
>>> + // TODO Auto-generated catch block
>>> + e.printStackTrace();
>>> + } catch (KeeperException e) {
>>> + // TODO Auto-generated catch block
>>> + e.printStackTrace();
>>> + } catch (InterruptedException e) {
>>> + // TODO Auto-generated catch block
>>> + e.printStackTrace();
>>> + }
>>> + }
>>> +
>>> +}
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
>>> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>>> import java.net.InetSocketAddress;
>>> import java.util.ArrayList;
>>> import java.util.HashMap;
>>> +import java.util.HashSet;
>>> import java.util.LinkedHashMap;
>>> import java.util.List;
>>> import java.util.Map;
>>> +import java.util.Set;
>>> import java.util.TreeMap;
>>> import java.util.concurrent.BlockingQueue;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>>> import org.apache.hadoop.ipc.RemoteException;
>>> import org.apache.hadoop.net.DNS;
>>> import org.apache.hadoop.util.DiskChecker;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.hadoop.util.RunJar;
>>> import org.apache.hadoop.util.StringUtils;
>>> import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>>> import org.apache.hama.Constants;
>>> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>>> public class GroomServer implements Runnable {
>>> public static final Log LOG = LogFactory.getLog(GroomServer.class);
>>> private static BSPPeer bspPeer;
>>> + static final String SUBDIR = "groomServer";
>>>
>>> Configuration conf;
>>>
>>> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>>> }
>>>
>>> try {
>>> + localizeJob(tip);
>>> + } catch (Throwable e) {
>>> + String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
>>> + .stringifyException(e));
>>> + LOG.warn(msg);
>>> + }
>>> + }
>>> +
>>> + private void localizeJob(TaskInProgress tip) throws IOException {
>>> + Task task = tip.getTask();
>>> + conf.addResource(task.getJobFile());
>>> + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> +
>>> + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> + + task.getTaskID() + "/" + "job.xml");
>>> +
>>> + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
>>> + BSPJob jobConf = null;
>>> +
>>> + synchronized (rjob) {
>>> + if (!rjob.localized) {
>>> + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> + + task.getTaskID() + "/" + "job.jar");
>>> + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
>>> +
>>> + HamaConfiguration conf = new HamaConfiguration();
>>> + conf.addResource(localJobFile);
>>> + jobConf = new BSPJob(conf, task.getJobID().toString());
>>> + jobConf.setJar(localJarFile.toString());
>>> +
>>> + if (jarFile != null) {
>>> + systemFS.copyToLocalFile(jarFile, localJarFile);
>>> +
>>> + // also unjar the job.jar files in workdir
>>> + File workDir = new File(
>>> + new File(localJobFile.toString()).getParent(), "work");
>>> + if (!workDir.mkdirs()) {
>>> + if (!workDir.isDirectory()) {
>>> + throw new IOException("Mkdirs failed to create "
>>> + + workDir.toString());
>>> + }
>>> + }
>>> + RunJar.unJar(new File(localJarFile.toString()), workDir);
>>> + }
>>> + rjob.localized = true;
>>> + }
>>> + }
>>> + launchTaskForJob(tip, jobConf);
>>> + }
>>> +
>>> + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
>>> + try {
>>> + tip.setJobConf(jobConf);
>>> tip.launchTask();
>>> } catch (Throwable ie) {
>>> - // TODO: when job failed.
>>> + tip.taskStatus.setRunState(TaskStatus.State.FAILED);
>>> + String error = StringUtils.stringifyException(ie);
>>> + LOG.info(error);
>>> + }
>>> + }
>>> +
>>> + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
>>> + TaskInProgress tip) {
>>> + synchronized (runningJobs) {
>>> + RunningJob rJob = null;
>>> + if (!runningJobs.containsKey(jobId)) {
>>> + rJob = new RunningJob(jobId, localJobFile);
>>> + rJob.localized = false;
>>> + rJob.tasks = new HashSet<TaskInProgress>();
>>> + rJob.jobFile = localJobFile;
>>> + runningJobs.put(jobId, rJob);
>>> + } else {
>>> + rJob = runningJobs.get(jobId);
>>> + }
>>> + rJob.tasks.add(tip);
>>> + return rJob;
>>> + }
>>> + }
>>> +
>>> + /**
>>> + * The datastructure for initializing a job
>>> + */
>>> + static class RunningJob {
>>> + private BSPJobID jobid;
>>> + private Path jobFile;
>>> + // keep this for later use
>>> + Set<TaskInProgress> tasks;
>>> + boolean localized;
>>> + boolean keepJobFiles;
>>> +
>>> + RunningJob(BSPJobID jobid, Path jobFile) {
>>> + this.jobid = jobid;
>>> + localized = false;
>>> + tasks = new HashSet<TaskInProgress>();
>>> + this.jobFile = jobFile;
>>> + keepJobFiles = false;
>>> + }
>>> +
>>> + Path getJobFile() {
>>> + return jobFile;
>>> + }
>>> +
>>> + BSPJobID getJobId() {
>>> + return jobid;
>>> }
>>> }
>>>
>>> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>>> // /////////////////////////////////////////////////////
>>> class TaskInProgress {
>>> Task task;
>>> + BSPJob jobConf;
>>> + private BSPTaskRunner runner;
>>> volatile boolean done = false;
>>> volatile boolean wasKilled = false;
>>> private TaskStatus taskStatus;
>>> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>>> TaskStatus.Phase.STARTING);
>>> }
>>>
>>> - static final String SUBDIR = "groomServer";
>>> + public void setJobConf(BSPJob jobConf) {
>>> + this.jobConf = jobConf;
>>> + }
>>>
>>> - public void launchTask() {
>>> + public void launchTask() throws IOException {
>>> taskStatus.setRunState(TaskStatus.State.RUNNING);
>>> + this.runner = task.createRunner(bspPeer, this.jobConf);
>>> + this.runner.start();
>>>
>>> - try {
>>> - // TODO: need to move this code to TaskRunner
>>> -
>>> - task.getJobFile();
>>> - conf.addResource(task.getJobFile());
>>> - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> -
>>> - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> - + task.getTaskID() + "/" + "job.xml");
>>> - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> - + task.getTaskID() + "/" + "job.jar");
>>> -
>>> - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
>>> - ".jar")), localJarFile);
>>> -
>>> - HamaConfiguration conf = new HamaConfiguration();
>>> - conf.addResource(localJobFile);
>>> - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
>>> - jobConf.setJar(localJarFile.toString());
>>> -
>>> - BSP bsp = (BSP) ReflectionUtils
>>> - .newInstance(jobConf.getBspClass(), conf);
>>> - bsp.setPeer(bspPeer);
>>> + // Check state of Task
>>> + while (true) {
>>> try {
>>> - bsp.runBSP();
>>> - } catch (Exception e) {
>>> + Thread.sleep(1000);
>>> + } catch (InterruptedException e) {
>>> e.printStackTrace();
>>> - taskStatus.setRunState(TaskStatus.State.FAILED);
>>> }
>>>
>>> - } catch (IOException e) {
>>> - // TODO Auto-generated catch block
>>> - e.printStackTrace();
>>> - } finally {
>>> -
>>> - while (true) {
>>> - try {
>>> - Thread.sleep(1000);
>>> - } catch (InterruptedException e) {
>>> - e.printStackTrace();
>>> - }
>>> -
>>> - // If local/outgoing queues are empty, task is done.
>>> - if (bspPeer.localQueue.size() == 0
>>> - && bspPeer.outgoingQueues.size() == 0) {
>>> - taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> - acceptNewTasks = true;
>>> - break;
>>> - }
>>> + // If local/outgoing queues are empty, task is done.
>>> + if (bspPeer.localQueue.size() == 0
>>> + && bspPeer.outgoingQueues.size() == 0) {
>>> + taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> + acceptNewTasks = true;
>>> + break;
>>> }
>>> }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
>>> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>>> }
>>>
>>> public void readFields(DataInput in) throws IOException {
>>> - task = new Task();
>>> + task = new BSPTask();
>>> task.readFields(in);
>>> }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
>>> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>>>
>>> try {
>>> GroomServer servers = new GroomServer(conf);
>>> - Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
>>> + Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>>>
>>> // TODO not yet implemented
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
>>> @@ -23,14 +23,12 @@ import java.io.IOException;
>>>
>>> import org.apache.commons.logging.Log;
>>> import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.conf.Configuration;
>>> import org.apache.hadoop.fs.LocalDirAllocator;
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.io.Writable;
>>>
>>> -/**
>>> - *
>>> - */
>>> -public class Task implements Writable {
>>> +public abstract class Task implements Writable {
>>> public static final Log LOG = LogFactory.getLog(Task.class);
>>> ////////////////////////////////////////////
>>> // Fields
>>> @@ -109,5 +107,7 @@ public class Task implements Writable {
>>> taskId = Text.readString(in);
>>> partition = in.readInt();
>>> }
>>> +
>>> + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>>>
>>> }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
>>> @@ -101,7 +101,7 @@ class TaskInProgress {
>>> return null;
>>> }
>>>
>>> - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
>>> + t = new BSPTask(jobId, jobFile, taskid, partition);
>>> activeTasks.put(taskid, status.getGroomName());
>>>
>>> // Ask JobTracker to note that the task exists
>>>
>>>
>>>
>>
>>
>>
>> --
>> Filipe David Manana,
>> fdmanana@gmail.com, fdmanana@apache.org
>>
>> "Reasonable men adapt themselves to the world.
>> Unreasonable men adapt the world to themselves.
>> That's why all progress depends on unreasonable men."
>>
>
>
>
> --
> Best Regards, Edward J. Yoon
> edwardyoon@apache.org
> http://blog.udanax.org
>
--
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org
"Reasonable men adapt themselves to the world.
Unreasonable men adapt the world to themselves.
That's why all progress depends on unreasonable men."
Re: svn commit: r1024485 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Posted by "Edward J. Yoon" <ed...@apache.org>.
Thanks.
BTW, getAddress() and getHostName() methods are somewhat duplicated.
If we remove one of the two, we have to change all code, related with
it.
https://issues.apache.org/jira/browse/HAMA-316
Could you please comment here?
On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana
<fd...@apache.org> wrote:
> A big +1 on this one :)
>
> On Wed, Oct 20, 2010 at 2:48 AM, <ed...@apache.org> wrote:
>> Author: edwardyoon
>> Date: Wed Oct 20 01:48:36 2010
>> New Revision: 1024485
>>
>> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
>> Log:
>> Refactoring launchTask() method in GroomServer
>>
>> Added:
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>> Modified:
>> incubator/hama/trunk/CHANGES.txt
>> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>> incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>
>> Modified: incubator/hama/trunk/CHANGES.txt
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/CHANGES.txt (original)
>> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
>> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>>
>> IMPROVEMENTS
>>
>> + HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>> HAMA-311: Add unit tests for IPC package (edwardyoon)
>> HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>> HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>>
>> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
>> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
>> @@ -54,7 +54,7 @@ public class PiEstimator {
>> }
>> }
>>
>> - byte[] tagName = Bytes.toBytes(getName().toString());
>> + byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>> byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>> BSPMessage estimate = new BSPMessage(tagName, myData);
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
>> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>> /**
>> * This class provides an abstract implementation of the BSP interface
>> */
>> -public abstract class BSP extends Thread implements BSPInterface {
>> - private BSPPeer bspPeer;
>> -
>> - /**
>> - * A thread's run method.
>> - *
>> - * The run method performs the
>> - * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
>> - */
>> - public void runBSP() throws Exception {
>> - bsp(bspPeer);
>> - }
>> -
>> - public void setPeer(BSPPeer bspServer) {
>> - this.bspPeer = bspServer;
>> - }
>> +public abstract class BSP implements BSPInterface {
>> }
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
>> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>> public BSPMessage() {
>> }
>>
>> + /**
>> + * Constructor
>> + *
>> + * @param tag of data
>> + * @param data of message
>> + */
>> public BSPMessage(byte[] tag, byte[] data) {
>> this.tag = new byte[tag.length];
>> this.data = new byte[data.length];
>> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>> System.arraycopy(data, 0, this.data, 0, data.length);
>> }
>>
>> + /**
>> + * BSP messages are typically identified with tags. This allows to get the tag
>> + * of data.
>> + *
>> + * @return tag of data of BSP message
>> + */
>> public byte[] getTag() {
>> byte[] result = this.tag;
>> return result;
>> }
>>
>> + /**
>> + * @return data of BSP message
>> + */
>> public byte[] getData() {
>> byte[] result = this.data;
>> return result;
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
>> @@ -17,26 +17,21 @@
>> */
>> package org.apache.hama.bsp;
>>
>> -import org.apache.hadoop.conf.Configuration;
>> -import org.apache.hadoop.util.ReflectionUtils;
>> -
>> public class BSPTask extends Task {
>> - private BSP bsp;
>> - private Configuration conf;
>> -
>> - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
>> +
>> + public BSPTask() {
>> + }
>> +
>> + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>> this.jobId = jobId;
>> this.jobFile = jobFile;
>> this.taskId = taskid;
>> this.partition = partition;
>> - this.conf = conf;
>> }
>>
>> - public BSP getBSPClass() {
>> - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
>> - BSP.class), conf);
>> -
>> - return bsp;
>> + @Override
>> + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
>> + return new BSPTaskRunner(this, bspPeer, conf);
>> }
>>
>> }
>>
>> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
>> @@ -0,0 +1,62 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements. See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership. The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.hama.bsp;
>> +
>> +import java.io.IOException;
>> +
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>> +import org.apache.hadoop.util.ReflectionUtils;
>> +import org.apache.zookeeper.KeeperException;
>> +
>> +public class BSPTaskRunner extends Thread {
>> +
>> + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
>> + private Task task;
>> + private BSPJob conf;
>> + private BSPPeer bspPeer;
>> +
>> + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
>> + this.task = bspTask;
>> + this.conf = conf;
>> + this.bspPeer = bspPeer;
>> + }
>> +
>> + public Task getTask() {
>> + return task;
>> + }
>> +
>> + public void run() {
>> + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
>> + "bsp.work.class", BSP.class), conf.getConf());
>> +
>> + try {
>> + bsp.bsp(bspPeer);
>> + } catch (IOException e) {
>> + // TODO Auto-generated catch block
>> + e.printStackTrace();
>> + } catch (KeeperException e) {
>> + // TODO Auto-generated catch block
>> + e.printStackTrace();
>> + } catch (InterruptedException e) {
>> + // TODO Auto-generated catch block
>> + e.printStackTrace();
>> + }
>> + }
>> +
>> +}
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
>> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>> import java.net.InetSocketAddress;
>> import java.util.ArrayList;
>> import java.util.HashMap;
>> +import java.util.HashSet;
>> import java.util.LinkedHashMap;
>> import java.util.List;
>> import java.util.Map;
>> +import java.util.Set;
>> import java.util.TreeMap;
>> import java.util.concurrent.BlockingQueue;
>> import java.util.concurrent.LinkedBlockingQueue;
>> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>> import org.apache.hadoop.ipc.RemoteException;
>> import org.apache.hadoop.net.DNS;
>> import org.apache.hadoop.util.DiskChecker;
>> -import org.apache.hadoop.util.ReflectionUtils;
>> +import org.apache.hadoop.util.RunJar;
>> import org.apache.hadoop.util.StringUtils;
>> import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>> import org.apache.hama.Constants;
>> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>> public class GroomServer implements Runnable {
>> public static final Log LOG = LogFactory.getLog(GroomServer.class);
>> private static BSPPeer bspPeer;
>> + static final String SUBDIR = "groomServer";
>>
>> Configuration conf;
>>
>> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>> }
>>
>> try {
>> + localizeJob(tip);
>> + } catch (Throwable e) {
>> + String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
>> + .stringifyException(e));
>> + LOG.warn(msg);
>> + }
>> + }
>> +
>> + private void localizeJob(TaskInProgress tip) throws IOException {
>> + Task task = tip.getTask();
>> + conf.addResource(task.getJobFile());
>> + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>> +
>> + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> + + task.getTaskID() + "/" + "job.xml");
>> +
>> + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
>> + BSPJob jobConf = null;
>> +
>> + synchronized (rjob) {
>> + if (!rjob.localized) {
>> + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> + + task.getTaskID() + "/" + "job.jar");
>> + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>> + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
>> +
>> + HamaConfiguration conf = new HamaConfiguration();
>> + conf.addResource(localJobFile);
>> + jobConf = new BSPJob(conf, task.getJobID().toString());
>> + jobConf.setJar(localJarFile.toString());
>> +
>> + if (jarFile != null) {
>> + systemFS.copyToLocalFile(jarFile, localJarFile);
>> +
>> + // also unjar the job.jar files in workdir
>> + File workDir = new File(
>> + new File(localJobFile.toString()).getParent(), "work");
>> + if (!workDir.mkdirs()) {
>> + if (!workDir.isDirectory()) {
>> + throw new IOException("Mkdirs failed to create "
>> + + workDir.toString());
>> + }
>> + }
>> + RunJar.unJar(new File(localJarFile.toString()), workDir);
>> + }
>> + rjob.localized = true;
>> + }
>> + }
>> + launchTaskForJob(tip, jobConf);
>> + }
>> +
>> + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
>> + try {
>> + tip.setJobConf(jobConf);
>> tip.launchTask();
>> } catch (Throwable ie) {
>> - // TODO: when job failed.
>> + tip.taskStatus.setRunState(TaskStatus.State.FAILED);
>> + String error = StringUtils.stringifyException(ie);
>> + LOG.info(error);
>> + }
>> + }
>> +
>> + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
>> + TaskInProgress tip) {
>> + synchronized (runningJobs) {
>> + RunningJob rJob = null;
>> + if (!runningJobs.containsKey(jobId)) {
>> + rJob = new RunningJob(jobId, localJobFile);
>> + rJob.localized = false;
>> + rJob.tasks = new HashSet<TaskInProgress>();
>> + rJob.jobFile = localJobFile;
>> + runningJobs.put(jobId, rJob);
>> + } else {
>> + rJob = runningJobs.get(jobId);
>> + }
>> + rJob.tasks.add(tip);
>> + return rJob;
>> + }
>> + }
>> +
>> + /**
>> + * The datastructure for initializing a job
>> + */
>> + static class RunningJob {
>> + private BSPJobID jobid;
>> + private Path jobFile;
>> + // keep this for later use
>> + Set<TaskInProgress> tasks;
>> + boolean localized;
>> + boolean keepJobFiles;
>> +
>> + RunningJob(BSPJobID jobid, Path jobFile) {
>> + this.jobid = jobid;
>> + localized = false;
>> + tasks = new HashSet<TaskInProgress>();
>> + this.jobFile = jobFile;
>> + keepJobFiles = false;
>> + }
>> +
>> + Path getJobFile() {
>> + return jobFile;
>> + }
>> +
>> + BSPJobID getJobId() {
>> + return jobid;
>> }
>> }
>>
>> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>> // /////////////////////////////////////////////////////
>> class TaskInProgress {
>> Task task;
>> + BSPJob jobConf;
>> + private BSPTaskRunner runner;
>> volatile boolean done = false;
>> volatile boolean wasKilled = false;
>> private TaskStatus taskStatus;
>> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>> TaskStatus.Phase.STARTING);
>> }
>>
>> - static final String SUBDIR = "groomServer";
>> + public void setJobConf(BSPJob jobConf) {
>> + this.jobConf = jobConf;
>> + }
>>
>> - public void launchTask() {
>> + public void launchTask() throws IOException {
>> taskStatus.setRunState(TaskStatus.State.RUNNING);
>> + this.runner = task.createRunner(bspPeer, this.jobConf);
>> + this.runner.start();
>>
>> - try {
>> - // TODO: need to move this code to TaskRunner
>> -
>> - task.getJobFile();
>> - conf.addResource(task.getJobFile());
>> - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>> -
>> - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> - + task.getTaskID() + "/" + "job.xml");
>> - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> - + task.getTaskID() + "/" + "job.jar");
>> -
>> - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>> - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
>> - ".jar")), localJarFile);
>> -
>> - HamaConfiguration conf = new HamaConfiguration();
>> - conf.addResource(localJobFile);
>> - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
>> - jobConf.setJar(localJarFile.toString());
>> -
>> - BSP bsp = (BSP) ReflectionUtils
>> - .newInstance(jobConf.getBspClass(), conf);
>> - bsp.setPeer(bspPeer);
>> + // Check state of Task
>> + while (true) {
>> try {
>> - bsp.runBSP();
>> - } catch (Exception e) {
>> + Thread.sleep(1000);
>> + } catch (InterruptedException e) {
>> e.printStackTrace();
>> - taskStatus.setRunState(TaskStatus.State.FAILED);
>> }
>>
>> - } catch (IOException e) {
>> - // TODO Auto-generated catch block
>> - e.printStackTrace();
>> - } finally {
>> -
>> - while (true) {
>> - try {
>> - Thread.sleep(1000);
>> - } catch (InterruptedException e) {
>> - e.printStackTrace();
>> - }
>> -
>> - // If local/outgoing queues are empty, task is done.
>> - if (bspPeer.localQueue.size() == 0
>> - && bspPeer.outgoingQueues.size() == 0) {
>> - taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>> - acceptNewTasks = true;
>> - break;
>> - }
>> + // If local/outgoing queues are empty, task is done.
>> + if (bspPeer.localQueue.size() == 0
>> + && bspPeer.outgoingQueues.size() == 0) {
>> + taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>> + acceptNewTasks = true;
>> + break;
>> }
>> }
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
>> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>> }
>>
>> public void readFields(DataInput in) throws IOException {
>> - task = new Task();
>> + task = new BSPTask();
>> task.readFields(in);
>> }
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
>> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>>
>> try {
>> GroomServer servers = new GroomServer(conf);
>> - Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
>> + Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>>
>> // TODO not yet implemented
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
>> @@ -23,14 +23,12 @@ import java.io.IOException;
>>
>> import org.apache.commons.logging.Log;
>> import org.apache.commons.logging.LogFactory;
>> +import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.LocalDirAllocator;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.io.Writable;
>>
>> -/**
>> - *
>> - */
>> -public class Task implements Writable {
>> +public abstract class Task implements Writable {
>> public static final Log LOG = LogFactory.getLog(Task.class);
>> ////////////////////////////////////////////
>> // Fields
>> @@ -109,5 +107,7 @@ public class Task implements Writable {
>> taskId = Text.readString(in);
>> partition = in.readInt();
>> }
>> +
>> + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>>
>> }
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
>> @@ -101,7 +101,7 @@ class TaskInProgress {
>> return null;
>> }
>>
>> - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
>> + t = new BSPTask(jobId, jobFile, taskid, partition);
>> activeTasks.put(taskid, status.getGroomName());
>>
>> // Ask JobTracker to note that the task exists
>>
>>
>>
>
>
>
> --
> Filipe David Manana,
> fdmanana@gmail.com, fdmanana@apache.org
>
> "Reasonable men adapt themselves to the world.
> Unreasonable men adapt the world to themselves.
> That's why all progress depends on unreasonable men."
>
--
Best Regards, Edward J. Yoon
edwardyoon@apache.org
http://blog.udanax.org
Re: svn commit: r1024485 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Posted by Filipe David Manana <fd...@apache.org>.
A big +1 on this one :)
On Wed, Oct 20, 2010 at 2:48 AM, <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Wed Oct 20 01:48:36 2010
> New Revision: 1024485
>
> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
> Log:
> Refactoring launchTask() method in GroomServer
>
> Added:
> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> Modified:
> incubator/hama/trunk/CHANGES.txt
> incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
> incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>
> Modified: incubator/hama/trunk/CHANGES.txt
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/CHANGES.txt (original)
> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>
> IMPROVEMENTS
>
> + HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
> HAMA-311: Add unit tests for IPC package (edwardyoon)
> HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
> HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>
> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
> @@ -54,7 +54,7 @@ public class PiEstimator {
> }
> }
>
> - byte[] tagName = Bytes.toBytes(getName().toString());
> + byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
> byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
> BSPMessage estimate = new BSPMessage(tagName, myData);
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
> /**
> * This class provides an abstract implementation of the BSP interface
> */
> -public abstract class BSP extends Thread implements BSPInterface {
> - private BSPPeer bspPeer;
> -
> - /**
> - * A thread's run method.
> - *
> - * The run method performs the
> - * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
> - */
> - public void runBSP() throws Exception {
> - bsp(bspPeer);
> - }
> -
> - public void setPeer(BSPPeer bspServer) {
> - this.bspPeer = bspServer;
> - }
> +public abstract class BSP implements BSPInterface {
> }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
> public BSPMessage() {
> }
>
> + /**
> + * Constructor
> + *
> + * @param tag of data
> + * @param data of message
> + */
> public BSPMessage(byte[] tag, byte[] data) {
> this.tag = new byte[tag.length];
> this.data = new byte[data.length];
> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
> System.arraycopy(data, 0, this.data, 0, data.length);
> }
>
> + /**
> + * BSP messages are typically identified with tags. This allows to get the tag
> + * of data.
> + *
> + * @return tag of data of BSP message
> + */
> public byte[] getTag() {
> byte[] result = this.tag;
> return result;
> }
>
> + /**
> + * @return data of BSP message
> + */
> public byte[] getData() {
> byte[] result = this.data;
> return result;
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
> @@ -17,26 +17,21 @@
> */
> package org.apache.hama.bsp;
>
> -import org.apache.hadoop.conf.Configuration;
> -import org.apache.hadoop.util.ReflectionUtils;
> -
> public class BSPTask extends Task {
> - private BSP bsp;
> - private Configuration conf;
> -
> - public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
> +
> + public BSPTask() {
> + }
> +
> + public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
> this.jobId = jobId;
> this.jobFile = jobFile;
> this.taskId = taskid;
> this.partition = partition;
> - this.conf = conf;
> }
>
> - public BSP getBSPClass() {
> - bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
> - BSP.class), conf);
> -
> - return bsp;
> + @Override
> + public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
> + return new BSPTaskRunner(this, bspPeer, conf);
> }
>
> }
>
> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
> @@ -0,0 +1,62 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hama.bsp;
> +
> +import java.io.IOException;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.zookeeper.KeeperException;
> +
> +public class BSPTaskRunner extends Thread {
> +
> + public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
> + private Task task;
> + private BSPJob conf;
> + private BSPPeer bspPeer;
> +
> + public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
> + this.task = bspTask;
> + this.conf = conf;
> + this.bspPeer = bspPeer;
> + }
> +
> + public Task getTask() {
> + return task;
> + }
> +
> + public void run() {
> + BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
> + "bsp.work.class", BSP.class), conf.getConf());
> +
> + try {
> + bsp.bsp(bspPeer);
> + } catch (IOException e) {
> + // TODO Auto-generated catch block
> + e.printStackTrace();
> + } catch (KeeperException e) {
> + // TODO Auto-generated catch block
> + e.printStackTrace();
> + } catch (InterruptedException e) {
> + // TODO Auto-generated catch block
> + e.printStackTrace();
> + }
> + }
> +
> +}
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
> import java.net.InetSocketAddress;
> import java.util.ArrayList;
> import java.util.HashMap;
> +import java.util.HashSet;
> import java.util.LinkedHashMap;
> import java.util.List;
> import java.util.Map;
> +import java.util.Set;
> import java.util.TreeMap;
> import java.util.concurrent.BlockingQueue;
> import java.util.concurrent.LinkedBlockingQueue;
> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
> import org.apache.hadoop.ipc.RemoteException;
> import org.apache.hadoop.net.DNS;
> import org.apache.hadoop.util.DiskChecker;
> -import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.hadoop.util.RunJar;
> import org.apache.hadoop.util.StringUtils;
> import org.apache.hadoop.util.DiskChecker.DiskErrorException;
> import org.apache.hama.Constants;
> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
> public class GroomServer implements Runnable {
> public static final Log LOG = LogFactory.getLog(GroomServer.class);
> private static BSPPeer bspPeer;
> + static final String SUBDIR = "groomServer";
>
> Configuration conf;
>
> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
> }
>
> try {
> + localizeJob(tip);
> + } catch (Throwable e) {
> + String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
> + .stringifyException(e));
> + LOG.warn(msg);
> + }
> + }
> +
> + private void localizeJob(TaskInProgress tip) throws IOException {
> + Task task = tip.getTask();
> + conf.addResource(task.getJobFile());
> + BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> +
> + Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> + + task.getTaskID() + "/" + "job.xml");
> +
> + RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
> + BSPJob jobConf = null;
> +
> + synchronized (rjob) {
> + if (!rjob.localized) {
> + Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> + + task.getTaskID() + "/" + "job.jar");
> + systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> + Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
> +
> + HamaConfiguration conf = new HamaConfiguration();
> + conf.addResource(localJobFile);
> + jobConf = new BSPJob(conf, task.getJobID().toString());
> + jobConf.setJar(localJarFile.toString());
> +
> + if (jarFile != null) {
> + systemFS.copyToLocalFile(jarFile, localJarFile);
> +
> + // also unjar the job.jar files in workdir
> + File workDir = new File(
> + new File(localJobFile.toString()).getParent(), "work");
> + if (!workDir.mkdirs()) {
> + if (!workDir.isDirectory()) {
> + throw new IOException("Mkdirs failed to create "
> + + workDir.toString());
> + }
> + }
> + RunJar.unJar(new File(localJarFile.toString()), workDir);
> + }
> + rjob.localized = true;
> + }
> + }
> + launchTaskForJob(tip, jobConf);
> + }
> +
> + private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
> + try {
> + tip.setJobConf(jobConf);
> tip.launchTask();
> } catch (Throwable ie) {
> - // TODO: when job failed.
> + tip.taskStatus.setRunState(TaskStatus.State.FAILED);
> + String error = StringUtils.stringifyException(ie);
> + LOG.info(error);
> + }
> + }
> +
> + private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
> + TaskInProgress tip) {
> + synchronized (runningJobs) {
> + RunningJob rJob = null;
> + if (!runningJobs.containsKey(jobId)) {
> + rJob = new RunningJob(jobId, localJobFile);
> + rJob.localized = false;
> + rJob.tasks = new HashSet<TaskInProgress>();
> + rJob.jobFile = localJobFile;
> + runningJobs.put(jobId, rJob);
> + } else {
> + rJob = runningJobs.get(jobId);
> + }
> + rJob.tasks.add(tip);
> + return rJob;
> + }
> + }
> +
> + /**
> + * The datastructure for initializing a job
> + */
> + static class RunningJob {
> + private BSPJobID jobid;
> + private Path jobFile;
> + // keep this for later use
> + Set<TaskInProgress> tasks;
> + boolean localized;
> + boolean keepJobFiles;
> +
> + RunningJob(BSPJobID jobid, Path jobFile) {
> + this.jobid = jobid;
> + localized = false;
> + tasks = new HashSet<TaskInProgress>();
> + this.jobFile = jobFile;
> + keepJobFiles = false;
> + }
> +
> + Path getJobFile() {
> + return jobFile;
> + }
> +
> + BSPJobID getJobId() {
> + return jobid;
> }
> }
>
> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
> // /////////////////////////////////////////////////////
> class TaskInProgress {
> Task task;
> + BSPJob jobConf;
> + private BSPTaskRunner runner;
> volatile boolean done = false;
> volatile boolean wasKilled = false;
> private TaskStatus taskStatus;
> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
> TaskStatus.Phase.STARTING);
> }
>
> - static final String SUBDIR = "groomServer";
> + public void setJobConf(BSPJob jobConf) {
> + this.jobConf = jobConf;
> + }
>
> - public void launchTask() {
> + public void launchTask() throws IOException {
> taskStatus.setRunState(TaskStatus.State.RUNNING);
> + this.runner = task.createRunner(bspPeer, this.jobConf);
> + this.runner.start();
>
> - try {
> - // TODO: need to move this code to TaskRunner
> -
> - task.getJobFile();
> - conf.addResource(task.getJobFile());
> - BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> -
> - Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> - + task.getTaskID() + "/" + "job.xml");
> - Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> - + task.getTaskID() + "/" + "job.jar");
> -
> - systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> - systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
> - ".jar")), localJarFile);
> -
> - HamaConfiguration conf = new HamaConfiguration();
> - conf.addResource(localJobFile);
> - BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
> - jobConf.setJar(localJarFile.toString());
> -
> - BSP bsp = (BSP) ReflectionUtils
> - .newInstance(jobConf.getBspClass(), conf);
> - bsp.setPeer(bspPeer);
> + // Check state of Task
> + while (true) {
> try {
> - bsp.runBSP();
> - } catch (Exception e) {
> + Thread.sleep(1000);
> + } catch (InterruptedException e) {
> e.printStackTrace();
> - taskStatus.setRunState(TaskStatus.State.FAILED);
> }
>
> - } catch (IOException e) {
> - // TODO Auto-generated catch block
> - e.printStackTrace();
> - } finally {
> -
> - while (true) {
> - try {
> - Thread.sleep(1000);
> - } catch (InterruptedException e) {
> - e.printStackTrace();
> - }
> -
> - // If local/outgoing queues are empty, task is done.
> - if (bspPeer.localQueue.size() == 0
> - && bspPeer.outgoingQueues.size() == 0) {
> - taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> - acceptNewTasks = true;
> - break;
> - }
> + // If local/outgoing queues are empty, task is done.
> + if (bspPeer.localQueue.size() == 0
> + && bspPeer.outgoingQueues.size() == 0) {
> + taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> + acceptNewTasks = true;
> + break;
> }
> }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
> }
>
> public void readFields(DataInput in) throws IOException {
> - task = new Task();
> + task = new BSPTask();
> task.readFields(in);
> }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>
> try {
> GroomServer servers = new GroomServer(conf);
> - Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
> + Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>
> // TODO not yet implemented
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
> @@ -23,14 +23,12 @@ import java.io.IOException;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.LocalDirAllocator;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.io.Writable;
>
> -/**
> - *
> - */
> -public class Task implements Writable {
> +public abstract class Task implements Writable {
> public static final Log LOG = LogFactory.getLog(Task.class);
> ////////////////////////////////////////////
> // Fields
> @@ -109,5 +107,7 @@ public class Task implements Writable {
> taskId = Text.readString(in);
> partition = in.readInt();
> }
> +
> + public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>
> }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
> @@ -101,7 +101,7 @@ class TaskInProgress {
> return null;
> }
>
> - t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
> + t = new BSPTask(jobId, jobFile, taskid, partition);
> activeTasks.put(taskid, status.getGroomName());
>
> // Ask JobTracker to note that the task exists
>
>
>
--
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org
"Reasonable men adapt themselves to the world.
Unreasonable men adapt the world to themselves.
That's why all progress depends on unreasonable men."