You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/02/21 07:18:17 UTC

svn commit: r1072853 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Mon Feb 21 06:18:17 2011
New Revision: 1072853

URL: http://svn.apache.org/viewvc?rev=1072853&view=rev
Log:
Add "random communication benchmark" tool

Added:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Feb 21 06:18:17 2011
@@ -51,6 +51,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
+    HAMA-353: Add "random communication benchmark" tool (edwardyoon)
     HAMA-351: Improvement of lack of info- about the output of examples (edwardyoon)
     HAMA-348: Remove hard-coded javaOpts (edwardyoon)
     HAMA-347: Add implementation of umbilical interface (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Mon Feb 21 06:18:17 2011
@@ -27,6 +27,7 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+      pgd.addClass("bench", RandBench.class, "Random Communication Benchmark");
       pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test");
       
       pgd.driver(args);

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1072853&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Mon Feb 21 06:18:17 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+public class RandBench {
+  private static final String SIZEOFMSG = "msg.size";
+  private static final String N_COMMUNICATIONS = "communications.num";
+  private static final String N_SUPERSTEPS = "supersteps.num";
+
+  public static class RandBSP extends BSP {
+    private Configuration conf;
+    private Random r = new Random();
+    private int sizeOfMsg;
+    private int nCommunications;
+    private int nSupersteps;
+
+    @Override
+    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+      byte[] dummyData = new byte[sizeOfMsg];
+      BSPMessage msg = null;
+      String[] peers = bspPeer.getAllPeerNames();
+      String peerName = bspPeer.getPeerName();
+
+      for (int i = 0; i < nSupersteps; i++) {
+
+        for (int j = 0; j < nCommunications; j++) {
+          String tPeer = peers[r.nextInt(peers.length)];
+          String tag = peerName + " to " + tPeer;
+          msg = new BSPMessage(Bytes.toBytes(tag), dummyData);
+          bspPeer.send(tPeer, msg);
+        }
+
+        bspPeer.sync();
+
+        if ((nSupersteps - 1) == i) {
+          bspPeer.clear();
+        }
+      }
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
+      this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
+      this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.out.println("Usage: <sizeOfMsg> <nCommunications> <nSupersteps>");
+      System.exit(-1);
+    }
+
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+
+    conf.setInt(SIZEOFMSG, Integer.parseInt(args[0]));
+    conf.setInt(N_COMMUNICATIONS, Integer.parseInt(args[1]));
+    conf.setInt(N_SUPERSTEPS, Integer.parseInt(args[2]));
+
+    BSPJob bsp = new BSPJob(conf, RandBench.class);
+    // Set the job name
+    bsp.setJobName("Random Communication Benchmark");
+    bsp.setBspClass(RandBSP.class);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    bsp.setNumBspTask(cluster.getGroomServers());
+
+    long startTime = System.currentTimeMillis();
+    bsp.waitForCompletion(true);
+    System.out.println("Job Finished in "
+        + (double) (System.currentTimeMillis() - startTime) / 1000.0
+        + " seconds");
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Feb 21 06:18:17 2011
@@ -46,7 +46,7 @@ public interface Constants {
   /** Default port region server listens on. */
   public static final int DEFAULT_PEER_PORT = 61000;
 
-  public static final long ATLEAST_WAIT_TIME = 100;
+  public static final long ATLEAST_WAIT_TIME = 1000;
   public static final String PEER_ID = "bsp.peer.id";
   
   /** Parameter name for what groom server implementation to use. */

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Mon Feb 21 06:18:17 2011
@@ -140,7 +140,7 @@ public class BSPJob extends BSPJobContex
   // /////////////////////////////////////
   // Methods for Job Control
   // /////////////////////////////////////
-  public float progress() throws IOException {
+  public long progress() throws IOException {
     ensureState(JobState.RUNNING);
     return info.progress();
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Mon Feb 21 06:18:17 2011
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.HamaConfiguration;
@@ -109,7 +108,7 @@ public class BSPJobClient extends Config
     }
 
     @Override
-    public float progress() throws IOException {
+    public long progress() throws IOException {
       ensureFreshStatus();
       return status.progress();
     }
@@ -358,10 +357,11 @@ public class BSPJobClient extends Config
 
     while (!job.isComplete()) {
       Thread.sleep(1000);
-      String report = "bsp: " + StringUtils.formatPercent(job.progress(), 0);
+      long step = job.progress();
+      String report = "Current supersteps number: " + step;
 
       if (!report.equals(lastReport)) {
-        LOG.debug(report);
+        LOG.info(report);
         lastReport = report;
       }
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Mon Feb 21 06:18:17 2011
@@ -188,9 +188,7 @@ public class BSPMaster implements JobSub
           WorkerProtocol.class, WorkerProtocol.versionID,
           resolveWorkerAddress(status.getRpcServer()), this.conf);
       if (null == wc) {
-        LOG
-            .warn("Fail to create Worker client at host "
-                + status.getPeerName());
+        LOG.warn("Fail to create Worker client at host " + status.getPeerName());
         return false;
       }
       // TODO: need to check if peer name has changed
@@ -237,6 +235,7 @@ public class BSPMaster implements JobSub
     }
     // update GroomServerStatus hold in groomServers cache.
     GroomServerStatus fstus = directive.getStatus();
+
     // groomServers cache contains groom server status reported back
     if (groomServers.containsKey(fstus)) {
       GroomServerStatus ustus = null;
@@ -247,6 +246,7 @@ public class BSPMaster implements JobSub
           break;
         }
       }// for
+
       if (null != ustus) {
         List<TaskStatus> tlist = ustus.getTaskReports();
         for (TaskStatus ts : tlist) {
@@ -254,9 +254,13 @@ public class BSPMaster implements JobSub
 
           TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
               .getTaskId()).getTaskID());
-          jip.completedTask(tip, ts);
-          LOG.info("JobInProgress id:" + jip.getJobID() + " status:"
-              + jip.getStatus());
+          
+          if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
+            jip.completedTask(tip, ts);
+          } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
+            // do nothing
+          }
+          
           if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
             for (JobInProgressListener listener : jobInProgressListeners) {
               try {
@@ -265,6 +269,9 @@ public class BSPMaster implements JobSub
                 LOG.error("Fail to alter scheduler a job is moved.", ioe);
               }
             }
+
+          } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+            jip.getStatus().setprogress(ts.getSuperstepCount());
           }
         }
       } else {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Feb 21 06:18:17 2011
@@ -238,6 +238,11 @@ public class BSPPeer implements Watcher,
     }
   }
 
+  public void clear() {
+    this.localQueue.clear();
+    this.outgoingQueues.clear();
+  }
+  
   @Override
   public void close() throws IOException {
     server.stop();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Mon Feb 21 06:18:17 2011
@@ -72,4 +72,9 @@ public interface BSPPeerInterface extend
    * @return The names of all the peers executing tasks from the same job (including this peer).
    */
   public String[] getAllPeerNames();
+  
+  /**
+   * Clears all queues entries.
+   */
+  public void clear();
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Feb 21 06:18:17 2011
@@ -65,7 +65,7 @@ public class GroomServer implements Runn
   private BSPPeer bspPeer;
   static final String SUBDIR = "groomServer";
 
-  private volatile static int REPORT_INTERVAL = 60 * 1000;
+  private volatile static int REPORT_INTERVAL = 1 * 1000;
 
   Configuration conf;
 
@@ -102,7 +102,8 @@ public class GroomServer implements Runn
 
   // new nexus between GroomServer and BSPMaster
   // holds/ manage all tasks
-  //List<TaskInProgress> tasksList = new CopyOnWriteArrayList<TaskInProgress>();
+  // List<TaskInProgress> tasksList = new
+  // CopyOnWriteArrayList<TaskInProgress>();
 
   private String rpcServer;
   private Server workerServer;
@@ -132,8 +133,9 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
-          "default"), conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -165,6 +167,7 @@ public class GroomServer implements Runn
       LOG.info("Worker rpc server --> " + rpcServer);
     }
 
+    @SuppressWarnings("deprecation")
     String address = NetUtils.getServerAddress(conf,
         "bsp.groom.report.bindAddress", "bsp.groom.report.port",
         "bsp.groom.report.address");
@@ -293,9 +296,13 @@ public class GroomServer implements Runn
   }
 
   public State offerService() throws Exception {
-
     while (running && !shuttingDown) {
       try {
+        Thread.sleep(REPORT_INTERVAL);
+      } catch (InterruptedException ie) {
+      }
+      
+      try {
         if (justInited) {
           String dir = masterClient.getSystemDir();
           if (dir == null) {
@@ -363,11 +370,11 @@ public class GroomServer implements Runn
         Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
             + task.getTaskID() + "/" + "job.jar");
         systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
-        
+
         HamaConfiguration conf = new HamaConfiguration();
         conf.addResource(localJobFile);
         jobConf = new BSPJob(conf, task.getJobID().toString());
-        
+
         Path jarFile = new Path(jobConf.getJar());
         jobConf.setJar(localJarFile.toString());
 
@@ -599,7 +606,9 @@ public class GroomServer implements Runn
       // Check state of a Task
       while (true) {
         try {
-          Thread.sleep(1000);
+          taskStatus.setProgress(bspPeer.getSuperstepCount());
+          doReport(this.taskStatus);
+          Thread.sleep(REPORT_INTERVAL);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
@@ -607,6 +616,7 @@ public class GroomServer implements Runn
         if (bspPeer.getLocalQueueSize() == 0
             && bspPeer.getOutgoingQueueSize() == 0 && !runner.isAlive()) {
           taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
           doReport(this.taskStatus);
           break;
         }
@@ -618,9 +628,9 @@ public class GroomServer implements Runn
      * Update and report refresh status back to BSPMaster.
      */
     private void doReport(TaskStatus taskStatus) {
-      GroomServerStatus gss = new GroomServerStatus(groomServerName, bspPeer
-          .getPeerName(), updateTaskStatus(taskStatus), failures, maxCurrentTasks,
-          rpcServer);
+      GroomServerStatus gss = new GroomServerStatus(groomServerName,
+          bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
+          maxCurrentTasks, rpcServer);
       try {
         boolean ret = masterClient.report(new Directive(gss));
         if (!ret) {
@@ -635,15 +645,18 @@ public class GroomServer implements Runn
 
     private List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
       List<TaskStatus> tlist = new ArrayList<TaskStatus>();
-      synchronized(runningTasks){
-        synchronized(finishedTasks){
-          TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
-          taskStatus.setProgress(1f);
-          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+      synchronized (runningTasks) {
+
+        if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
+          synchronized (finishedTasks) {
+            TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
+            tlist.add((TaskStatus) taskStatus.clone());
+            finishedTasks.put(taskStatus.getTaskId(), tip);
+          }
+        } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
           tlist.add((TaskStatus) taskStatus.clone());
-          finishedTasks.put(taskStatus.getTaskId(), tip);
         }
+
       }
       return tlist;
     }
@@ -850,4 +863,9 @@ public class GroomServer implements Runn
   public String[] getAllPeerNames() {
     return bspPeer.getAllPeerNames();
   }
+
+  @Override
+  public void clear() {
+    bspPeer.clear();
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Mon Feb 21 06:18:17 2011
@@ -77,7 +77,7 @@ class JobInProgress {
     this.localFs = FileSystem.getLocal(conf);
     this.jobFile = jobFile;
     this.master = master;
-    this.status = new JobStatus(jobId, null, 0.0f, 0.0f, JobStatus.State.PREP.value());
+    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP.value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -183,7 +183,7 @@ class JobInProgress {
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
-        1.0f, 1.0f, JobStatus.RUNNING);
+        0L, 0L, JobStatus.RUNNING);
 
     tasksInited = true;
     LOG.debug("Job is initialized.");
@@ -233,7 +233,7 @@ class JobInProgress {
 
     if (allDone) {
       this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), 1.0f, 1.0f, 1.0f, JobStatus.SUCCEEDED, superstepCounter);
+          .getUser(), superstepCounter, superstepCounter, superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
       
@@ -258,7 +258,7 @@ class JobInProgress {
     LOG.debug(">> JobInProgress.kill() step.");
     if (status.getRunState() != JobStatus.FAILED) {
       this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
-          1.0f, 1.0f, 1.0f, JobStatus.FAILED);
+          0L, 0L, 0L, JobStatus.FAILED);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
       //

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Mon Feb 21 06:18:17 2011
@@ -21,13 +21,16 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
 public class JobStatus implements Writable, Cloneable {
-
+  public static final Log LOG = LogFactory.getLog(JobStatus.class);
+  
   static {
     WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
       public Writable newInstance() {
@@ -58,9 +61,9 @@ public class JobStatus implements Writab
   public static final int KILLED = 5;
 
   private BSPJobID jobid;
-  private float progress;
-  private float cleanupProgress;
-  private float setupProgress;
+  private long progress;
+  private long cleanupProgress;
+  private long setupProgress;
   private volatile State state;// runState in enum
   private int runState;
   private long startTime;
@@ -73,22 +76,22 @@ public class JobStatus implements Writab
   public JobStatus() {
   }
 
-  public JobStatus(BSPJobID jobid, String user, float progress, int runState) {
-    this(jobid, user, progress, 0.0f, runState);
+  public JobStatus(BSPJobID jobid, String user, long progress, int runState) {
+    this(jobid, user, progress, 0, runState);
   }
 
-  public JobStatus(BSPJobID jobid, String user, float progress, float cleanupProgress,
+  public JobStatus(BSPJobID jobid, String user, long progress, long cleanupProgress,
       int runState) {
-    this(jobid, user, 0.0f, progress, cleanupProgress, runState);
+    this(jobid, user, 0, progress, cleanupProgress, runState);
   }
 
-  public JobStatus(BSPJobID jobid, String user, float setupProgress, float progress,
-      float cleanupProgress, int runState) {
-    this(jobid, user, 0.0f, progress, cleanupProgress, runState, 0);
+  public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress,
+      long cleanupProgress, int runState) {
+    this(jobid, user, 0, progress, cleanupProgress, runState, 0);
   }
 
-  public JobStatus(BSPJobID jobid, String user, float setupProgress, float progress,
-      float cleanupProgress, int runState, long superstepCount) {
+  public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress,
+      long cleanupProgress, int runState, long superstepCount) {
     this.jobid = jobid;
     this.setupProgress = setupProgress;
     this.progress = progress;
@@ -103,28 +106,28 @@ public class JobStatus implements Writab
     return jobid;
   }
 
-  public synchronized float progress() {
+  public synchronized long progress() {
     return progress;
   }
 
-  synchronized void setprogress(float p) {
-    this.progress = (float) Math.min(1.0, Math.max(0.0, p));
+  synchronized void setprogress(long p) {
+    this.progress = p;
   }
 
-  public synchronized float cleanupProgress() {
+  public synchronized long cleanupProgress() {
     return cleanupProgress;
   }
 
-  synchronized void setCleanupProgress(float p) {
-    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  synchronized void setCleanupProgress(int p) {
+    this.cleanupProgress = p;
   }
 
-  public synchronized float setupProgress() {
+  public synchronized long setupProgress() {
     return setupProgress;
   }
 
-  synchronized void setSetupProgress(float p) {
-    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  synchronized void setSetupProgress(long p) {
+    this.setupProgress = p;
   }
 
   public JobStatus.State getState(){
@@ -203,9 +206,9 @@ public class JobStatus implements Writab
 
   public synchronized void write(DataOutput out) throws IOException {
     jobid.write(out);
-    out.writeFloat(setupProgress);
-    out.writeFloat(progress);
-    out.writeFloat(cleanupProgress);
+    out.writeLong(setupProgress);
+    out.writeLong(progress);
+    out.writeLong(cleanupProgress);
     out.writeInt(runState);
     out.writeLong(startTime);
     out.writeLong(finishTime);
@@ -217,9 +220,9 @@ public class JobStatus implements Writab
   public synchronized void readFields(DataInput in) throws IOException {
     this.jobid = new BSPJobID();
     jobid.readFields(in);
-    this.setupProgress = in.readFloat();
-    this.progress = in.readFloat();
-    this.cleanupProgress = in.readFloat();
+    this.setupProgress = in.readLong();
+    this.progress = in.readLong();
+    this.cleanupProgress = in.readLong();
     this.runState = in.readInt();
     this.startTime = in.readLong();
     this.finishTime = in.readLong();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java?rev=1072853&r1=1072852&r2=1072853&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java Mon Feb 21 06:18:17 2011
@@ -60,7 +60,7 @@ public interface RunningJob {
    * @return the progress of the job's tasks.
    * @throws IOException
    */
-  public float progress() throws IOException;
+  public long progress() throws IOException;
 
   /**
    * Check if the job is finished or not. This is a non-blocking call.