You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/20 03:48:36 UTC

svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Wed Oct 20 01:48:36 2010
New Revision: 1024485

URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
Log:
Refactoring launchTask() method in GroomServer

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
     HAMA-311: Add unit tests for IPC package (edwardyoon)
     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
@@ -54,7 +54,7 @@ public class PiEstimator {
         }
       }
 
-      byte[] tagName = Bytes.toBytes(getName().toString());
+      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
@@ -20,20 +20,5 @@ package org.apache.hama.bsp;
 /**
  * This class provides an abstract implementation of the BSP interface
  */
-public abstract class BSP extends Thread implements BSPInterface {
-  private BSPPeer bspPeer;
-  
-  /**
-   * A thread's run method.
-   * 
-   * The run method performs the
-   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
-   */
-  public void runBSP() throws Exception {
-    bsp(bspPeer);
-  }
-  
-  public void setPeer(BSPPeer bspServer) {
-    this.bspPeer = bspServer;
-  }
+public abstract class BSP implements BSPInterface {
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
@@ -30,6 +30,12 @@ public class BSPMessage implements Writa
   public BSPMessage() {
   }
 
+  /**
+   * Constructor 
+   * 
+   * @param tag of data
+   * @param data of message
+   */
   public BSPMessage(byte[] tag, byte[] data) {
     this.tag = new byte[tag.length];
     this.data = new byte[data.length];
@@ -37,11 +43,20 @@ public class BSPMessage implements Writa
     System.arraycopy(data, 0, this.data, 0, data.length);
   }
 
+  /**
+   * BSP messages are typically identified with tags. This allows to get the tag
+   * of data.
+   * 
+   * @return tag of data of BSP message
+   */
   public byte[] getTag() {
     byte[] result = this.tag;
     return result;
   }
 
+  /**
+   * @return data of BSP message
+   */
   public byte[] getData() {
     byte[] result = this.data;
     return result;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
@@ -17,26 +17,21 @@
  */
 package org.apache.hama.bsp;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-
 public class BSPTask extends Task {
-  private BSP bsp;
-  private Configuration conf;
-  
-  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
+
+  public BSPTask() {
+  }
+
+  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;
     this.partition = partition;
-    this.conf = conf;
   }
 
-  public BSP getBSPClass() {
-    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
-        BSP.class), conf);
-    
-    return bsp;
+  @Override
+  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
+    return new BSPTaskRunner(this, bspPeer, conf);
   }
 
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.KeeperException;
+
+public class BSPTaskRunner extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
+  private Task task;
+  private BSPJob conf;
+  private BSPPeer bspPeer;
+
+  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
+    this.task = bspTask;
+    this.conf = conf;
+    this.bspPeer = bspPeer;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  public void run() {
+    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
+        "bsp.work.class", BSP.class), conf.getConf());
+
+    try {
+      bsp.bsp(bspPeer);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (KeeperException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
@@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
@@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
 public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
   private static BSPPeer bspPeer;
+  static final String SUBDIR = "groomServer";
 
   Configuration conf;
 
@@ -281,9 +284,111 @@ public class GroomServer implements Runn
     }
 
     try {
+      localizeJob(tip);
+    } catch (Throwable e) {
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+          .stringifyException(e));
+      LOG.warn(msg);
+    }
+  }
+
+  private void localizeJob(TaskInProgress tip) throws IOException {
+    Task task = tip.getTask();
+    conf.addResource(task.getJobFile());
+    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
+
+    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+        + task.getTaskID() + "/" + "job.xml");
+
+    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
+    BSPJob jobConf = null;
+
+    synchronized (rjob) {
+      if (!rjob.localized) {
+        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+            + task.getTaskID() + "/" + "job.jar");
+        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
+
+        HamaConfiguration conf = new HamaConfiguration();
+        conf.addResource(localJobFile);
+        jobConf = new BSPJob(conf, task.getJobID().toString());
+        jobConf.setJar(localJarFile.toString());
+
+        if (jarFile != null) {
+          systemFS.copyToLocalFile(jarFile, localJarFile);
+
+          // also unjar the job.jar files in workdir
+          File workDir = new File(
+              new File(localJobFile.toString()).getParent(), "work");
+          if (!workDir.mkdirs()) {
+            if (!workDir.isDirectory()) {
+              throw new IOException("Mkdirs failed to create "
+                  + workDir.toString());
+            }
+          }
+          RunJar.unJar(new File(localJarFile.toString()), workDir);
+        }
+        rjob.localized = true;
+      }
+    }
+    launchTaskForJob(tip, jobConf);
+  }
+
+  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+    try {
+      tip.setJobConf(jobConf);
       tip.launchTask();
     } catch (Throwable ie) {
-      // TODO: when job failed.
+      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+      String error = StringUtils.stringifyException(ie);
+      LOG.info(error);
+    }
+  }
+
+  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
+      TaskInProgress tip) {
+    synchronized (runningJobs) {
+      RunningJob rJob = null;
+      if (!runningJobs.containsKey(jobId)) {
+        rJob = new RunningJob(jobId, localJobFile);
+        rJob.localized = false;
+        rJob.tasks = new HashSet<TaskInProgress>();
+        rJob.jobFile = localJobFile;
+        runningJobs.put(jobId, rJob);
+      } else {
+        rJob = runningJobs.get(jobId);
+      }
+      rJob.tasks.add(tip);
+      return rJob;
+    }
+  }
+
+  /**
+   * The datastructure for initializing a job
+   */
+  static class RunningJob {
+    private BSPJobID jobid;
+    private Path jobFile;
+    // keep this for later use
+    Set<TaskInProgress> tasks;
+    boolean localized;
+    boolean keepJobFiles;
+
+    RunningJob(BSPJobID jobid, Path jobFile) {
+      this.jobid = jobid;
+      localized = false;
+      tasks = new HashSet<TaskInProgress>();
+      this.jobFile = jobFile;
+      keepJobFiles = false;
+    }
+
+    Path getJobFile() {
+      return jobFile;
+    }
+
+    BSPJobID getJobId() {
+      return jobid;
     }
   }
 
@@ -410,6 +515,8 @@ public class GroomServer implements Runn
   // /////////////////////////////////////////////////////
   class TaskInProgress {
     Task task;
+    BSPJob jobConf;
+    private BSPTaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
@@ -421,61 +528,29 @@ public class GroomServer implements Runn
           TaskStatus.Phase.STARTING);
     }
 
-    static final String SUBDIR = "groomServer";
+    public void setJobConf(BSPJob jobConf) {
+      this.jobConf = jobConf;
+    }
 
-    public void launchTask() {
+    public void launchTask() throws IOException {
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      this.runner = task.createRunner(bspPeer, this.jobConf);
+      this.runner.start();
 
-      try {
-        // TODO: need to move this code to TaskRunner
-
-        task.getJobFile();
-        conf.addResource(task.getJobFile());
-        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
-
-        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
-            + task.getTaskID() + "/" + "job.xml");
-        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
-            + task.getTaskID() + "/" + "job.jar");
-
-        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
-        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
-            ".jar")), localJarFile);
-
-        HamaConfiguration conf = new HamaConfiguration();
-        conf.addResource(localJobFile);
-        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
-        jobConf.setJar(localJarFile.toString());
-
-        BSP bsp = (BSP) ReflectionUtils
-            .newInstance(jobConf.getBspClass(), conf);
-        bsp.setPeer(bspPeer);
+      // Check state of Task
+      while (true) {
         try {
-          bsp.runBSP();
-        } catch (Exception e) {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
           e.printStackTrace();
-          taskStatus.setRunState(TaskStatus.State.FAILED);
         }
 
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      } finally {
-
-        while (true) {
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-
-          // If local/outgoing queues are empty, task is done.
-          if (bspPeer.localQueue.size() == 0
-              && bspPeer.outgoingQueues.size() == 0) {
-            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-            acceptNewTasks = true;
-            break;
-          }
+        // If local/outgoing queues are empty, task is done.
+        if (bspPeer.localQueue.size() == 0
+            && bspPeer.outgoingQueues.size() == 0) {
+          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          acceptNewTasks = true;
+          break;
         }
       }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
@@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
   }
 
   public void readFields(DataInput in) throws IOException {
-    task = new Task();
+    task = new BSPTask();
     task.readFields(in);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
@@ -172,7 +172,7 @@ public class LocalJobRunner implements J
 
           try {
             GroomServer servers = new GroomServer(conf);
-            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
+            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
             
             // TODO not yet implemented
             

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
@@ -23,14 +23,12 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-/**
- *
- */
-public class Task implements Writable {
+public abstract class Task implements Writable {
   public static final Log LOG = LogFactory.getLog(Task.class);
   ////////////////////////////////////////////
   // Fields
@@ -109,5 +107,7 @@ public class Task implements Writable {
     taskId = Text.readString(in);
     partition = in.readInt();
   }
+
+  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
   
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
@@ -101,7 +101,7 @@ class TaskInProgress {
         return null;
       }
 
-      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
+      t = new BSPTask(jobId, jobFile, taskid, partition);
       activeTasks.put(taskid, status.getGroomName());
 
       // Ask JobTracker to note that the task exists



Re: svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Posted by Filipe David Manana <fd...@apache.org>.
On Wed, Oct 20, 2010 at 10:41 AM, Edward J. Yoon <ed...@apache.org> wrote:
> Thanks.
>
> BTW, getAddress() and getHostName() methods are somewhat duplicated.
> If we remove one of the two, we have to change all code, related with
> it.
>
> https://issues.apache.org/jira/browse/HAMA-316
>
> Could you please comment here?

Done :)

>
> On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana
> <fd...@apache.org> wrote:
>> A big +1 on this one :)
>>
>> On Wed, Oct 20, 2010 at 2:48 AM,  <ed...@apache.org> wrote:
>>> Author: edwardyoon
>>> Date: Wed Oct 20 01:48:36 2010
>>> New Revision: 1024485
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
>>> Log:
>>> Refactoring launchTask() method in GroomServer
>>>
>>> Added:
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> Modified:
>>>    incubator/hama/trunk/CHANGES.txt
>>>    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>>
>>> Modified: incubator/hama/trunk/CHANGES.txt
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/CHANGES.txt (original)
>>> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
>>> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>>>
>>>   IMPROVEMENTS
>>>
>>> +    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>>>     HAMA-311: Add unit tests for IPC package (edwardyoon)
>>>     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>>>     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>>>
>>> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
>>> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
>>> @@ -54,7 +54,7 @@ public class PiEstimator {
>>>         }
>>>       }
>>>
>>> -      byte[] tagName = Bytes.toBytes(getName().toString());
>>> +      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>>>       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>>>       BSPMessage estimate = new BSPMessage(tagName, myData);
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
>>> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>>>  /**
>>>  * This class provides an abstract implementation of the BSP interface
>>>  */
>>> -public abstract class BSP extends Thread implements BSPInterface {
>>> -  private BSPPeer bspPeer;
>>> -
>>> -  /**
>>> -   * A thread's run method.
>>> -   *
>>> -   * The run method performs the
>>> -   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
>>> -   */
>>> -  public void runBSP() throws Exception {
>>> -    bsp(bspPeer);
>>> -  }
>>> -
>>> -  public void setPeer(BSPPeer bspServer) {
>>> -    this.bspPeer = bspServer;
>>> -  }
>>> +public abstract class BSP implements BSPInterface {
>>>  }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
>>> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>>>   public BSPMessage() {
>>>   }
>>>
>>> +  /**
>>> +   * Constructor
>>> +   *
>>> +   * @param tag of data
>>> +   * @param data of message
>>> +   */
>>>   public BSPMessage(byte[] tag, byte[] data) {
>>>     this.tag = new byte[tag.length];
>>>     this.data = new byte[data.length];
>>> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>>>     System.arraycopy(data, 0, this.data, 0, data.length);
>>>   }
>>>
>>> +  /**
>>> +   * BSP messages are typically identified with tags. This allows to get the tag
>>> +   * of data.
>>> +   *
>>> +   * @return tag of data of BSP message
>>> +   */
>>>   public byte[] getTag() {
>>>     byte[] result = this.tag;
>>>     return result;
>>>   }
>>>
>>> +  /**
>>> +   * @return data of BSP message
>>> +   */
>>>   public byte[] getData() {
>>>     byte[] result = this.data;
>>>     return result;
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
>>> @@ -17,26 +17,21 @@
>>>  */
>>>  package org.apache.hama.bsp;
>>>
>>> -import org.apache.hadoop.conf.Configuration;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> -
>>>  public class BSPTask extends Task {
>>> -  private BSP bsp;
>>> -  private Configuration conf;
>>> -
>>> -  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
>>> +
>>> +  public BSPTask() {
>>> +  }
>>> +
>>> +  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>>>     this.jobId = jobId;
>>>     this.jobFile = jobFile;
>>>     this.taskId = taskid;
>>>     this.partition = partition;
>>> -    this.conf = conf;
>>>   }
>>>
>>> -  public BSP getBSPClass() {
>>> -    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
>>> -        BSP.class), conf);
>>> -
>>> -    return bsp;
>>> +  @Override
>>> +  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
>>> +    return new BSPTaskRunner(this, bspPeer, conf);
>>>   }
>>>
>>>  }
>>>
>>> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
>>> @@ -0,0 +1,62 @@
>>> +/**
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +package org.apache.hama.bsp;
>>> +
>>> +import java.io.IOException;
>>> +
>>> +import org.apache.commons.logging.Log;
>>> +import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.zookeeper.KeeperException;
>>> +
>>> +public class BSPTaskRunner extends Thread {
>>> +
>>> +  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
>>> +  private Task task;
>>> +  private BSPJob conf;
>>> +  private BSPPeer bspPeer;
>>> +
>>> +  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
>>> +    this.task = bspTask;
>>> +    this.conf = conf;
>>> +    this.bspPeer = bspPeer;
>>> +  }
>>> +
>>> +  public Task getTask() {
>>> +    return task;
>>> +  }
>>> +
>>> +  public void run() {
>>> +    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
>>> +        "bsp.work.class", BSP.class), conf.getConf());
>>> +
>>> +    try {
>>> +      bsp.bsp(bspPeer);
>>> +    } catch (IOException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    } catch (KeeperException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    } catch (InterruptedException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    }
>>> +  }
>>> +
>>> +}
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
>>> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>>>  import java.net.InetSocketAddress;
>>>  import java.util.ArrayList;
>>>  import java.util.HashMap;
>>> +import java.util.HashSet;
>>>  import java.util.LinkedHashMap;
>>>  import java.util.List;
>>>  import java.util.Map;
>>> +import java.util.Set;
>>>  import java.util.TreeMap;
>>>  import java.util.concurrent.BlockingQueue;
>>>  import java.util.concurrent.LinkedBlockingQueue;
>>> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>>>  import org.apache.hadoop.ipc.RemoteException;
>>>  import org.apache.hadoop.net.DNS;
>>>  import org.apache.hadoop.util.DiskChecker;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.hadoop.util.RunJar;
>>>  import org.apache.hadoop.util.StringUtils;
>>>  import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>>>  import org.apache.hama.Constants;
>>> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>>>  public class GroomServer implements Runnable {
>>>   public static final Log LOG = LogFactory.getLog(GroomServer.class);
>>>   private static BSPPeer bspPeer;
>>> +  static final String SUBDIR = "groomServer";
>>>
>>>   Configuration conf;
>>>
>>> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>>>     }
>>>
>>>     try {
>>> +      localizeJob(tip);
>>> +    } catch (Throwable e) {
>>> +      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
>>> +          .stringifyException(e));
>>> +      LOG.warn(msg);
>>> +    }
>>> +  }
>>> +
>>> +  private void localizeJob(TaskInProgress tip) throws IOException {
>>> +    Task task = tip.getTask();
>>> +    conf.addResource(task.getJobFile());
>>> +    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> +
>>> +    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> +        + task.getTaskID() + "/" + "job.xml");
>>> +
>>> +    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
>>> +    BSPJob jobConf = null;
>>> +
>>> +    synchronized (rjob) {
>>> +      if (!rjob.localized) {
>>> +        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> +            + task.getTaskID() + "/" + "job.jar");
>>> +        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> +        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
>>> +
>>> +        HamaConfiguration conf = new HamaConfiguration();
>>> +        conf.addResource(localJobFile);
>>> +        jobConf = new BSPJob(conf, task.getJobID().toString());
>>> +        jobConf.setJar(localJarFile.toString());
>>> +
>>> +        if (jarFile != null) {
>>> +          systemFS.copyToLocalFile(jarFile, localJarFile);
>>> +
>>> +          // also unjar the job.jar files in workdir
>>> +          File workDir = new File(
>>> +              new File(localJobFile.toString()).getParent(), "work");
>>> +          if (!workDir.mkdirs()) {
>>> +            if (!workDir.isDirectory()) {
>>> +              throw new IOException("Mkdirs failed to create "
>>> +                  + workDir.toString());
>>> +            }
>>> +          }
>>> +          RunJar.unJar(new File(localJarFile.toString()), workDir);
>>> +        }
>>> +        rjob.localized = true;
>>> +      }
>>> +    }
>>> +    launchTaskForJob(tip, jobConf);
>>> +  }
>>> +
>>> +  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
>>> +    try {
>>> +      tip.setJobConf(jobConf);
>>>       tip.launchTask();
>>>     } catch (Throwable ie) {
>>> -      // TODO: when job failed.
>>> +      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
>>> +      String error = StringUtils.stringifyException(ie);
>>> +      LOG.info(error);
>>> +    }
>>> +  }
>>> +
>>> +  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
>>> +      TaskInProgress tip) {
>>> +    synchronized (runningJobs) {
>>> +      RunningJob rJob = null;
>>> +      if (!runningJobs.containsKey(jobId)) {
>>> +        rJob = new RunningJob(jobId, localJobFile);
>>> +        rJob.localized = false;
>>> +        rJob.tasks = new HashSet<TaskInProgress>();
>>> +        rJob.jobFile = localJobFile;
>>> +        runningJobs.put(jobId, rJob);
>>> +      } else {
>>> +        rJob = runningJobs.get(jobId);
>>> +      }
>>> +      rJob.tasks.add(tip);
>>> +      return rJob;
>>> +    }
>>> +  }
>>> +
>>> +  /**
>>> +   * The datastructure for initializing a job
>>> +   */
>>> +  static class RunningJob {
>>> +    private BSPJobID jobid;
>>> +    private Path jobFile;
>>> +    // keep this for later use
>>> +    Set<TaskInProgress> tasks;
>>> +    boolean localized;
>>> +    boolean keepJobFiles;
>>> +
>>> +    RunningJob(BSPJobID jobid, Path jobFile) {
>>> +      this.jobid = jobid;
>>> +      localized = false;
>>> +      tasks = new HashSet<TaskInProgress>();
>>> +      this.jobFile = jobFile;
>>> +      keepJobFiles = false;
>>> +    }
>>> +
>>> +    Path getJobFile() {
>>> +      return jobFile;
>>> +    }
>>> +
>>> +    BSPJobID getJobId() {
>>> +      return jobid;
>>>     }
>>>   }
>>>
>>> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>>>   // /////////////////////////////////////////////////////
>>>   class TaskInProgress {
>>>     Task task;
>>> +    BSPJob jobConf;
>>> +    private BSPTaskRunner runner;
>>>     volatile boolean done = false;
>>>     volatile boolean wasKilled = false;
>>>     private TaskStatus taskStatus;
>>> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>>>           TaskStatus.Phase.STARTING);
>>>     }
>>>
>>> -    static final String SUBDIR = "groomServer";
>>> +    public void setJobConf(BSPJob jobConf) {
>>> +      this.jobConf = jobConf;
>>> +    }
>>>
>>> -    public void launchTask() {
>>> +    public void launchTask() throws IOException {
>>>       taskStatus.setRunState(TaskStatus.State.RUNNING);
>>> +      this.runner = task.createRunner(bspPeer, this.jobConf);
>>> +      this.runner.start();
>>>
>>> -      try {
>>> -        // TODO: need to move this code to TaskRunner
>>> -
>>> -        task.getJobFile();
>>> -        conf.addResource(task.getJobFile());
>>> -        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> -
>>> -        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> -            + task.getTaskID() + "/" + "job.xml");
>>> -        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> -            + task.getTaskID() + "/" + "job.jar");
>>> -
>>> -        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> -        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
>>> -            ".jar")), localJarFile);
>>> -
>>> -        HamaConfiguration conf = new HamaConfiguration();
>>> -        conf.addResource(localJobFile);
>>> -        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
>>> -        jobConf.setJar(localJarFile.toString());
>>> -
>>> -        BSP bsp = (BSP) ReflectionUtils
>>> -            .newInstance(jobConf.getBspClass(), conf);
>>> -        bsp.setPeer(bspPeer);
>>> +      // Check state of Task
>>> +      while (true) {
>>>         try {
>>> -          bsp.runBSP();
>>> -        } catch (Exception e) {
>>> +          Thread.sleep(1000);
>>> +        } catch (InterruptedException e) {
>>>           e.printStackTrace();
>>> -          taskStatus.setRunState(TaskStatus.State.FAILED);
>>>         }
>>>
>>> -      } catch (IOException e) {
>>> -        // TODO Auto-generated catch block
>>> -        e.printStackTrace();
>>> -      } finally {
>>> -
>>> -        while (true) {
>>> -          try {
>>> -            Thread.sleep(1000);
>>> -          } catch (InterruptedException e) {
>>> -            e.printStackTrace();
>>> -          }
>>> -
>>> -          // If local/outgoing queues are empty, task is done.
>>> -          if (bspPeer.localQueue.size() == 0
>>> -              && bspPeer.outgoingQueues.size() == 0) {
>>> -            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> -            acceptNewTasks = true;
>>> -            break;
>>> -          }
>>> +        // If local/outgoing queues are empty, task is done.
>>> +        if (bspPeer.localQueue.size() == 0
>>> +            && bspPeer.outgoingQueues.size() == 0) {
>>> +          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> +          acceptNewTasks = true;
>>> +          break;
>>>         }
>>>       }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
>>> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>>>   }
>>>
>>>   public void readFields(DataInput in) throws IOException {
>>> -    task = new Task();
>>> +    task = new BSPTask();
>>>     task.readFields(in);
>>>   }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
>>> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>>>
>>>           try {
>>>             GroomServer servers = new GroomServer(conf);
>>> -            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
>>> +            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>>>
>>>             // TODO not yet implemented
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
>>> @@ -23,14 +23,12 @@ import java.io.IOException;
>>>
>>>  import org.apache.commons.logging.Log;
>>>  import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.conf.Configuration;
>>>  import org.apache.hadoop.fs.LocalDirAllocator;
>>>  import org.apache.hadoop.io.Text;
>>>  import org.apache.hadoop.io.Writable;
>>>
>>> -/**
>>> - *
>>> - */
>>> -public class Task implements Writable {
>>> +public abstract class Task implements Writable {
>>>   public static final Log LOG = LogFactory.getLog(Task.class);
>>>   ////////////////////////////////////////////
>>>   // Fields
>>> @@ -109,5 +107,7 @@ public class Task implements Writable {
>>>     taskId = Text.readString(in);
>>>     partition = in.readInt();
>>>   }
>>> +
>>> +  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>>>
>>>  }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
>>> @@ -101,7 +101,7 @@ class TaskInProgress {
>>>         return null;
>>>       }
>>>
>>> -      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
>>> +      t = new BSPTask(jobId, jobFile, taskid, partition);
>>>       activeTasks.put(taskid, status.getGroomName());
>>>
>>>       // Ask JobTracker to note that the task exists
>>>
>>>
>>>
>>
>>
>>
>> --
>> Filipe David Manana,
>> fdmanana@gmail.com, fdmanana@apache.org
>>
>> "Reasonable men adapt themselves to the world.
>>  Unreasonable men adapt the world to themselves.
>>  That's why all progress depends on unreasonable men."
>>
>
>
>
> --
> Best Regards, Edward J. Yoon
> edwardyoon@apache.org
> http://blog.udanax.org
>



-- 
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org

"Reasonable men adapt themselves to the world.
 Unreasonable men adapt the world to themselves.
 That's why all progress depends on unreasonable men."

Re: svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Posted by "Edward J. Yoon" <ed...@apache.org>.
Thanks.

BTW, getAddress() and getHostName() methods are somewhat duplicated.
If we remove one of the two, we have to change all code, related with
it.

https://issues.apache.org/jira/browse/HAMA-316

Could you please comment here?

On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana
<fd...@apache.org> wrote:
> A big +1 on this one :)
>
> On Wed, Oct 20, 2010 at 2:48 AM,  <ed...@apache.org> wrote:
>> Author: edwardyoon
>> Date: Wed Oct 20 01:48:36 2010
>> New Revision: 1024485
>>
>> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
>> Log:
>> Refactoring launchTask() method in GroomServer
>>
>> Added:
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>> Modified:
>>    incubator/hama/trunk/CHANGES.txt
>>    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>
>> Modified: incubator/hama/trunk/CHANGES.txt
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/CHANGES.txt (original)
>> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
>> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>>
>>   IMPROVEMENTS
>>
>> +    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>>     HAMA-311: Add unit tests for IPC package (edwardyoon)
>>     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>>     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>>
>> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
>> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
>> @@ -54,7 +54,7 @@ public class PiEstimator {
>>         }
>>       }
>>
>> -      byte[] tagName = Bytes.toBytes(getName().toString());
>> +      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>>       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>>       BSPMessage estimate = new BSPMessage(tagName, myData);
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
>> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>>  /**
>>  * This class provides an abstract implementation of the BSP interface
>>  */
>> -public abstract class BSP extends Thread implements BSPInterface {
>> -  private BSPPeer bspPeer;
>> -
>> -  /**
>> -   * A thread's run method.
>> -   *
>> -   * The run method performs the
>> -   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
>> -   */
>> -  public void runBSP() throws Exception {
>> -    bsp(bspPeer);
>> -  }
>> -
>> -  public void setPeer(BSPPeer bspServer) {
>> -    this.bspPeer = bspServer;
>> -  }
>> +public abstract class BSP implements BSPInterface {
>>  }
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
>> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>>   public BSPMessage() {
>>   }
>>
>> +  /**
>> +   * Constructor
>> +   *
>> +   * @param tag of data
>> +   * @param data of message
>> +   */
>>   public BSPMessage(byte[] tag, byte[] data) {
>>     this.tag = new byte[tag.length];
>>     this.data = new byte[data.length];
>> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>>     System.arraycopy(data, 0, this.data, 0, data.length);
>>   }
>>
>> +  /**
>> +   * BSP messages are typically identified with tags. This allows to get the tag
>> +   * of data.
>> +   *
>> +   * @return tag of data of BSP message
>> +   */
>>   public byte[] getTag() {
>>     byte[] result = this.tag;
>>     return result;
>>   }
>>
>> +  /**
>> +   * @return data of BSP message
>> +   */
>>   public byte[] getData() {
>>     byte[] result = this.data;
>>     return result;
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
>> @@ -17,26 +17,21 @@
>>  */
>>  package org.apache.hama.bsp;
>>
>> -import org.apache.hadoop.conf.Configuration;
>> -import org.apache.hadoop.util.ReflectionUtils;
>> -
>>  public class BSPTask extends Task {
>> -  private BSP bsp;
>> -  private Configuration conf;
>> -
>> -  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
>> +
>> +  public BSPTask() {
>> +  }
>> +
>> +  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>>     this.jobId = jobId;
>>     this.jobFile = jobFile;
>>     this.taskId = taskid;
>>     this.partition = partition;
>> -    this.conf = conf;
>>   }
>>
>> -  public BSP getBSPClass() {
>> -    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
>> -        BSP.class), conf);
>> -
>> -    return bsp;
>> +  @Override
>> +  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
>> +    return new BSPTaskRunner(this, bspPeer, conf);
>>   }
>>
>>  }
>>
>> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
>> @@ -0,0 +1,62 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.hama.bsp;
>> +
>> +import java.io.IOException;
>> +
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>> +import org.apache.hadoop.util.ReflectionUtils;
>> +import org.apache.zookeeper.KeeperException;
>> +
>> +public class BSPTaskRunner extends Thread {
>> +
>> +  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
>> +  private Task task;
>> +  private BSPJob conf;
>> +  private BSPPeer bspPeer;
>> +
>> +  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
>> +    this.task = bspTask;
>> +    this.conf = conf;
>> +    this.bspPeer = bspPeer;
>> +  }
>> +
>> +  public Task getTask() {
>> +    return task;
>> +  }
>> +
>> +  public void run() {
>> +    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
>> +        "bsp.work.class", BSP.class), conf.getConf());
>> +
>> +    try {
>> +      bsp.bsp(bspPeer);
>> +    } catch (IOException e) {
>> +      // TODO Auto-generated catch block
>> +      e.printStackTrace();
>> +    } catch (KeeperException e) {
>> +      // TODO Auto-generated catch block
>> +      e.printStackTrace();
>> +    } catch (InterruptedException e) {
>> +      // TODO Auto-generated catch block
>> +      e.printStackTrace();
>> +    }
>> +  }
>> +
>> +}
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
>> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>>  import java.net.InetSocketAddress;
>>  import java.util.ArrayList;
>>  import java.util.HashMap;
>> +import java.util.HashSet;
>>  import java.util.LinkedHashMap;
>>  import java.util.List;
>>  import java.util.Map;
>> +import java.util.Set;
>>  import java.util.TreeMap;
>>  import java.util.concurrent.BlockingQueue;
>>  import java.util.concurrent.LinkedBlockingQueue;
>> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>>  import org.apache.hadoop.ipc.RemoteException;
>>  import org.apache.hadoop.net.DNS;
>>  import org.apache.hadoop.util.DiskChecker;
>> -import org.apache.hadoop.util.ReflectionUtils;
>> +import org.apache.hadoop.util.RunJar;
>>  import org.apache.hadoop.util.StringUtils;
>>  import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>>  import org.apache.hama.Constants;
>> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>>  public class GroomServer implements Runnable {
>>   public static final Log LOG = LogFactory.getLog(GroomServer.class);
>>   private static BSPPeer bspPeer;
>> +  static final String SUBDIR = "groomServer";
>>
>>   Configuration conf;
>>
>> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>>     }
>>
>>     try {
>> +      localizeJob(tip);
>> +    } catch (Throwable e) {
>> +      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
>> +          .stringifyException(e));
>> +      LOG.warn(msg);
>> +    }
>> +  }
>> +
>> +  private void localizeJob(TaskInProgress tip) throws IOException {
>> +    Task task = tip.getTask();
>> +    conf.addResource(task.getJobFile());
>> +    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>> +
>> +    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> +        + task.getTaskID() + "/" + "job.xml");
>> +
>> +    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
>> +    BSPJob jobConf = null;
>> +
>> +    synchronized (rjob) {
>> +      if (!rjob.localized) {
>> +        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> +            + task.getTaskID() + "/" + "job.jar");
>> +        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>> +        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
>> +
>> +        HamaConfiguration conf = new HamaConfiguration();
>> +        conf.addResource(localJobFile);
>> +        jobConf = new BSPJob(conf, task.getJobID().toString());
>> +        jobConf.setJar(localJarFile.toString());
>> +
>> +        if (jarFile != null) {
>> +          systemFS.copyToLocalFile(jarFile, localJarFile);
>> +
>> +          // also unjar the job.jar files in workdir
>> +          File workDir = new File(
>> +              new File(localJobFile.toString()).getParent(), "work");
>> +          if (!workDir.mkdirs()) {
>> +            if (!workDir.isDirectory()) {
>> +              throw new IOException("Mkdirs failed to create "
>> +                  + workDir.toString());
>> +            }
>> +          }
>> +          RunJar.unJar(new File(localJarFile.toString()), workDir);
>> +        }
>> +        rjob.localized = true;
>> +      }
>> +    }
>> +    launchTaskForJob(tip, jobConf);
>> +  }
>> +
>> +  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
>> +    try {
>> +      tip.setJobConf(jobConf);
>>       tip.launchTask();
>>     } catch (Throwable ie) {
>> -      // TODO: when job failed.
>> +      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
>> +      String error = StringUtils.stringifyException(ie);
>> +      LOG.info(error);
>> +    }
>> +  }
>> +
>> +  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
>> +      TaskInProgress tip) {
>> +    synchronized (runningJobs) {
>> +      RunningJob rJob = null;
>> +      if (!runningJobs.containsKey(jobId)) {
>> +        rJob = new RunningJob(jobId, localJobFile);
>> +        rJob.localized = false;
>> +        rJob.tasks = new HashSet<TaskInProgress>();
>> +        rJob.jobFile = localJobFile;
>> +        runningJobs.put(jobId, rJob);
>> +      } else {
>> +        rJob = runningJobs.get(jobId);
>> +      }
>> +      rJob.tasks.add(tip);
>> +      return rJob;
>> +    }
>> +  }
>> +
>> +  /**
>> +   * The datastructure for initializing a job
>> +   */
>> +  static class RunningJob {
>> +    private BSPJobID jobid;
>> +    private Path jobFile;
>> +    // keep this for later use
>> +    Set<TaskInProgress> tasks;
>> +    boolean localized;
>> +    boolean keepJobFiles;
>> +
>> +    RunningJob(BSPJobID jobid, Path jobFile) {
>> +      this.jobid = jobid;
>> +      localized = false;
>> +      tasks = new HashSet<TaskInProgress>();
>> +      this.jobFile = jobFile;
>> +      keepJobFiles = false;
>> +    }
>> +
>> +    Path getJobFile() {
>> +      return jobFile;
>> +    }
>> +
>> +    BSPJobID getJobId() {
>> +      return jobid;
>>     }
>>   }
>>
>> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>>   // /////////////////////////////////////////////////////
>>   class TaskInProgress {
>>     Task task;
>> +    BSPJob jobConf;
>> +    private BSPTaskRunner runner;
>>     volatile boolean done = false;
>>     volatile boolean wasKilled = false;
>>     private TaskStatus taskStatus;
>> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>>           TaskStatus.Phase.STARTING);
>>     }
>>
>> -    static final String SUBDIR = "groomServer";
>> +    public void setJobConf(BSPJob jobConf) {
>> +      this.jobConf = jobConf;
>> +    }
>>
>> -    public void launchTask() {
>> +    public void launchTask() throws IOException {
>>       taskStatus.setRunState(TaskStatus.State.RUNNING);
>> +      this.runner = task.createRunner(bspPeer, this.jobConf);
>> +      this.runner.start();
>>
>> -      try {
>> -        // TODO: need to move this code to TaskRunner
>> -
>> -        task.getJobFile();
>> -        conf.addResource(task.getJobFile());
>> -        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>> -
>> -        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> -            + task.getTaskID() + "/" + "job.xml");
>> -        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>> -            + task.getTaskID() + "/" + "job.jar");
>> -
>> -        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>> -        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
>> -            ".jar")), localJarFile);
>> -
>> -        HamaConfiguration conf = new HamaConfiguration();
>> -        conf.addResource(localJobFile);
>> -        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
>> -        jobConf.setJar(localJarFile.toString());
>> -
>> -        BSP bsp = (BSP) ReflectionUtils
>> -            .newInstance(jobConf.getBspClass(), conf);
>> -        bsp.setPeer(bspPeer);
>> +      // Check state of Task
>> +      while (true) {
>>         try {
>> -          bsp.runBSP();
>> -        } catch (Exception e) {
>> +          Thread.sleep(1000);
>> +        } catch (InterruptedException e) {
>>           e.printStackTrace();
>> -          taskStatus.setRunState(TaskStatus.State.FAILED);
>>         }
>>
>> -      } catch (IOException e) {
>> -        // TODO Auto-generated catch block
>> -        e.printStackTrace();
>> -      } finally {
>> -
>> -        while (true) {
>> -          try {
>> -            Thread.sleep(1000);
>> -          } catch (InterruptedException e) {
>> -            e.printStackTrace();
>> -          }
>> -
>> -          // If local/outgoing queues are empty, task is done.
>> -          if (bspPeer.localQueue.size() == 0
>> -              && bspPeer.outgoingQueues.size() == 0) {
>> -            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>> -            acceptNewTasks = true;
>> -            break;
>> -          }
>> +        // If local/outgoing queues are empty, task is done.
>> +        if (bspPeer.localQueue.size() == 0
>> +            && bspPeer.outgoingQueues.size() == 0) {
>> +          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>> +          acceptNewTasks = true;
>> +          break;
>>         }
>>       }
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
>> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>>   }
>>
>>   public void readFields(DataInput in) throws IOException {
>> -    task = new Task();
>> +    task = new BSPTask();
>>     task.readFields(in);
>>   }
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
>> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>>
>>           try {
>>             GroomServer servers = new GroomServer(conf);
>> -            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
>> +            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>>
>>             // TODO not yet implemented
>>
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
>> @@ -23,14 +23,12 @@ import java.io.IOException;
>>
>>  import org.apache.commons.logging.Log;
>>  import org.apache.commons.logging.LogFactory;
>> +import org.apache.hadoop.conf.Configuration;
>>  import org.apache.hadoop.fs.LocalDirAllocator;
>>  import org.apache.hadoop.io.Text;
>>  import org.apache.hadoop.io.Writable;
>>
>> -/**
>> - *
>> - */
>> -public class Task implements Writable {
>> +public abstract class Task implements Writable {
>>   public static final Log LOG = LogFactory.getLog(Task.class);
>>   ////////////////////////////////////////////
>>   // Fields
>> @@ -109,5 +107,7 @@ public class Task implements Writable {
>>     taskId = Text.readString(in);
>>     partition = in.readInt();
>>   }
>> +
>> +  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>>
>>  }
>>
>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>> ==============================================================================
>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
>> @@ -101,7 +101,7 @@ class TaskInProgress {
>>         return null;
>>       }
>>
>> -      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
>> +      t = new BSPTask(jobId, jobFile, taskid, partition);
>>       activeTasks.put(taskid, status.getGroomName());
>>
>>       // Ask JobTracker to note that the task exists
>>
>>
>>
>
>
>
> --
> Filipe David Manana,
> fdmanana@gmail.com, fdmanana@apache.org
>
> "Reasonable men adapt themselves to the world.
>  Unreasonable men adapt the world to themselves.
>  That's why all progress depends on unreasonable men."
>



-- 
Best Regards, Edward J. Yoon
edwardyoon@apache.org
http://blog.udanax.org

Re: svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/

Posted by Filipe David Manana <fd...@apache.org>.
A big +1 on this one :)

On Wed, Oct 20, 2010 at 2:48 AM,  <ed...@apache.org> wrote:
> Author: edwardyoon
> Date: Wed Oct 20 01:48:36 2010
> New Revision: 1024485
>
> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
> Log:
> Refactoring launchTask() method in GroomServer
>
> Added:
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> Modified:
>    incubator/hama/trunk/CHANGES.txt
>    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>
> Modified: incubator/hama/trunk/CHANGES.txt
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/CHANGES.txt (original)
> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>
>   IMPROVEMENTS
>
> +    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>     HAMA-311: Add unit tests for IPC package (edwardyoon)
>     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>
> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20 01:48:36 2010
> @@ -54,7 +54,7 @@ public class PiEstimator {
>         }
>       }
>
> -      byte[] tagName = Bytes.toBytes(getName().toString());
> +      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>       BSPMessage estimate = new BSPMessage(tagName, myData);
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>  /**
>  * This class provides an abstract implementation of the BSP interface
>  */
> -public abstract class BSP extends Thread implements BSPInterface {
> -  private BSPPeer bspPeer;
> -
> -  /**
> -   * A thread's run method.
> -   *
> -   * The run method performs the
> -   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
> -   */
> -  public void runBSP() throws Exception {
> -    bsp(bspPeer);
> -  }
> -
> -  public void setPeer(BSPPeer bspServer) {
> -    this.bspPeer = bspServer;
> -  }
> +public abstract class BSP implements BSPInterface {
>  }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36 2010
> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>   public BSPMessage() {
>   }
>
> +  /**
> +   * Constructor
> +   *
> +   * @param tag of data
> +   * @param data of message
> +   */
>   public BSPMessage(byte[] tag, byte[] data) {
>     this.tag = new byte[tag.length];
>     this.data = new byte[data.length];
> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>     System.arraycopy(data, 0, this.data, 0, data.length);
>   }
>
> +  /**
> +   * BSP messages are typically identified with tags. This allows to get the tag
> +   * of data.
> +   *
> +   * @return tag of data of BSP message
> +   */
>   public byte[] getTag() {
>     byte[] result = this.tag;
>     return result;
>   }
>
> +  /**
> +   * @return data of BSP message
> +   */
>   public byte[] getData() {
>     byte[] result = this.data;
>     return result;
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
> @@ -17,26 +17,21 @@
>  */
>  package org.apache.hama.bsp;
>
> -import org.apache.hadoop.conf.Configuration;
> -import org.apache.hadoop.util.ReflectionUtils;
> -
>  public class BSPTask extends Task {
> -  private BSP bsp;
> -  private Configuration conf;
> -
> -  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration conf) {
> +
> +  public BSPTask() {
> +  }
> +
> +  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>     this.jobId = jobId;
>     this.jobFile = jobFile;
>     this.taskId = taskid;
>     this.partition = partition;
> -    this.conf = conf;
>   }
>
> -  public BSP getBSPClass() {
> -    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
> -        BSP.class), conf);
> -
> -    return bsp;
> +  @Override
> +  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
> +    return new BSPTaskRunner(this, bspPeer, conf);
>   }
>
>  }
>
> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36 2010
> @@ -0,0 +1,62 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hama.bsp;
> +
> +import java.io.IOException;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.zookeeper.KeeperException;
> +
> +public class BSPTaskRunner extends Thread {
> +
> +  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
> +  private Task task;
> +  private BSPJob conf;
> +  private BSPPeer bspPeer;
> +
> +  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
> +    this.task = bspTask;
> +    this.conf = conf;
> +    this.bspPeer = bspPeer;
> +  }
> +
> +  public Task getTask() {
> +    return task;
> +  }
> +
> +  public void run() {
> +    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
> +        "bsp.work.class", BSP.class), conf.getConf());
> +
> +    try {
> +      bsp.bsp(bspPeer);
> +    } catch (IOException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    } catch (KeeperException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    } catch (InterruptedException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    }
> +  }
> +
> +}
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36 2010
> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>  import java.net.InetSocketAddress;
>  import java.util.ArrayList;
>  import java.util.HashMap;
> +import java.util.HashSet;
>  import java.util.LinkedHashMap;
>  import java.util.List;
>  import java.util.Map;
> +import java.util.Set;
>  import java.util.TreeMap;
>  import java.util.concurrent.BlockingQueue;
>  import java.util.concurrent.LinkedBlockingQueue;
> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>  import org.apache.hadoop.ipc.RemoteException;
>  import org.apache.hadoop.net.DNS;
>  import org.apache.hadoop.util.DiskChecker;
> -import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.hadoop.util.RunJar;
>  import org.apache.hadoop.util.StringUtils;
>  import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>  import org.apache.hama.Constants;
> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>  public class GroomServer implements Runnable {
>   public static final Log LOG = LogFactory.getLog(GroomServer.class);
>   private static BSPPeer bspPeer;
> +  static final String SUBDIR = "groomServer";
>
>   Configuration conf;
>
> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>     }
>
>     try {
> +      localizeJob(tip);
> +    } catch (Throwable e) {
> +      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
> +          .stringifyException(e));
> +      LOG.warn(msg);
> +    }
> +  }
> +
> +  private void localizeJob(TaskInProgress tip) throws IOException {
> +    Task task = tip.getTask();
> +    conf.addResource(task.getJobFile());
> +    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> +
> +    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> +        + task.getTaskID() + "/" + "job.xml");
> +
> +    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
> +    BSPJob jobConf = null;
> +
> +    synchronized (rjob) {
> +      if (!rjob.localized) {
> +        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> +            + task.getTaskID() + "/" + "job.jar");
> +        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> +        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
> +
> +        HamaConfiguration conf = new HamaConfiguration();
> +        conf.addResource(localJobFile);
> +        jobConf = new BSPJob(conf, task.getJobID().toString());
> +        jobConf.setJar(localJarFile.toString());
> +
> +        if (jarFile != null) {
> +          systemFS.copyToLocalFile(jarFile, localJarFile);
> +
> +          // also unjar the job.jar files in workdir
> +          File workDir = new File(
> +              new File(localJobFile.toString()).getParent(), "work");
> +          if (!workDir.mkdirs()) {
> +            if (!workDir.isDirectory()) {
> +              throw new IOException("Mkdirs failed to create "
> +                  + workDir.toString());
> +            }
> +          }
> +          RunJar.unJar(new File(localJarFile.toString()), workDir);
> +        }
> +        rjob.localized = true;
> +      }
> +    }
> +    launchTaskForJob(tip, jobConf);
> +  }
> +
> +  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
> +    try {
> +      tip.setJobConf(jobConf);
>       tip.launchTask();
>     } catch (Throwable ie) {
> -      // TODO: when job failed.
> +      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
> +      String error = StringUtils.stringifyException(ie);
> +      LOG.info(error);
> +    }
> +  }
> +
> +  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
> +      TaskInProgress tip) {
> +    synchronized (runningJobs) {
> +      RunningJob rJob = null;
> +      if (!runningJobs.containsKey(jobId)) {
> +        rJob = new RunningJob(jobId, localJobFile);
> +        rJob.localized = false;
> +        rJob.tasks = new HashSet<TaskInProgress>();
> +        rJob.jobFile = localJobFile;
> +        runningJobs.put(jobId, rJob);
> +      } else {
> +        rJob = runningJobs.get(jobId);
> +      }
> +      rJob.tasks.add(tip);
> +      return rJob;
> +    }
> +  }
> +
> +  /**
> +   * The datastructure for initializing a job
> +   */
> +  static class RunningJob {
> +    private BSPJobID jobid;
> +    private Path jobFile;
> +    // keep this for later use
> +    Set<TaskInProgress> tasks;
> +    boolean localized;
> +    boolean keepJobFiles;
> +
> +    RunningJob(BSPJobID jobid, Path jobFile) {
> +      this.jobid = jobid;
> +      localized = false;
> +      tasks = new HashSet<TaskInProgress>();
> +      this.jobFile = jobFile;
> +      keepJobFiles = false;
> +    }
> +
> +    Path getJobFile() {
> +      return jobFile;
> +    }
> +
> +    BSPJobID getJobId() {
> +      return jobid;
>     }
>   }
>
> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>   // /////////////////////////////////////////////////////
>   class TaskInProgress {
>     Task task;
> +    BSPJob jobConf;
> +    private BSPTaskRunner runner;
>     volatile boolean done = false;
>     volatile boolean wasKilled = false;
>     private TaskStatus taskStatus;
> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>           TaskStatus.Phase.STARTING);
>     }
>
> -    static final String SUBDIR = "groomServer";
> +    public void setJobConf(BSPJob jobConf) {
> +      this.jobConf = jobConf;
> +    }
>
> -    public void launchTask() {
> +    public void launchTask() throws IOException {
>       taskStatus.setRunState(TaskStatus.State.RUNNING);
> +      this.runner = task.createRunner(bspPeer, this.jobConf);
> +      this.runner.start();
>
> -      try {
> -        // TODO: need to move this code to TaskRunner
> -
> -        task.getJobFile();
> -        conf.addResource(task.getJobFile());
> -        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> -
> -        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> -            + task.getTaskID() + "/" + "job.xml");
> -        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> -            + task.getTaskID() + "/" + "job.jar");
> -
> -        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> -        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
> -            ".jar")), localJarFile);
> -
> -        HamaConfiguration conf = new HamaConfiguration();
> -        conf.addResource(localJobFile);
> -        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
> -        jobConf.setJar(localJarFile.toString());
> -
> -        BSP bsp = (BSP) ReflectionUtils
> -            .newInstance(jobConf.getBspClass(), conf);
> -        bsp.setPeer(bspPeer);
> +      // Check state of Task
> +      while (true) {
>         try {
> -          bsp.runBSP();
> -        } catch (Exception e) {
> +          Thread.sleep(1000);
> +        } catch (InterruptedException e) {
>           e.printStackTrace();
> -          taskStatus.setRunState(TaskStatus.State.FAILED);
>         }
>
> -      } catch (IOException e) {
> -        // TODO Auto-generated catch block
> -        e.printStackTrace();
> -      } finally {
> -
> -        while (true) {
> -          try {
> -            Thread.sleep(1000);
> -          } catch (InterruptedException e) {
> -            e.printStackTrace();
> -          }
> -
> -          // If local/outgoing queues are empty, task is done.
> -          if (bspPeer.localQueue.size() == 0
> -              && bspPeer.outgoingQueues.size() == 0) {
> -            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> -            acceptNewTasks = true;
> -            break;
> -          }
> +        // If local/outgoing queues are empty, task is done.
> +        if (bspPeer.localQueue.size() == 0
> +            && bspPeer.outgoingQueues.size() == 0) {
> +          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> +          acceptNewTasks = true;
> +          break;
>         }
>       }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36 2010
> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>   }
>
>   public void readFields(DataInput in) throws IOException {
> -    task = new Task();
> +    task = new BSPTask();
>     task.readFields(in);
>   }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36 2010
> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>
>           try {
>             GroomServer servers = new GroomServer(conf);
> -            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
> +            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>
>             // TODO not yet implemented
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
> @@ -23,14 +23,12 @@ import java.io.IOException;
>
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.conf.Configuration;
>  import org.apache.hadoop.fs.LocalDirAllocator;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.io.Writable;
>
> -/**
> - *
> - */
> -public class Task implements Writable {
> +public abstract class Task implements Writable {
>   public static final Log LOG = LogFactory.getLog(Task.class);
>   ////////////////////////////////////////////
>   // Fields
> @@ -109,5 +107,7 @@ public class Task implements Writable {
>     taskId = Text.readString(in);
>     partition = in.readInt();
>   }
> +
> +  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>
>  }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36 2010
> @@ -101,7 +101,7 @@ class TaskInProgress {
>         return null;
>       }
>
> -      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
> +      t = new BSPTask(jobId, jobFile, taskid, partition);
>       activeTasks.put(taskid, status.getGroomName());
>
>       // Ask JobTracker to note that the task exists
>
>
>



-- 
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org

"Reasonable men adapt themselves to the world.
 Unreasonable men adapt the world to themselves.
 That's why all progress depends on unreasonable men."