You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/08/02 13:18:20 UTC

svn commit: r981457 - in /incubator/hama/trunk/src: examples/org/apache/hama/examples/ java/org/apache/hama/bsp/

Author: edwardyoon
Date: Mon Aug  2 11:18:20 2010
New Revision: 981457

URL: http://svn.apache.org/viewvc?rev=981457&view=rev
Log:
I just commit HAMA-276_v04.patch

Modified:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Mon Aug  2 11:18:20 2010
@@ -20,6 +20,8 @@ package org.apache.hama.examples;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hama.HamaConfiguration;
@@ -33,6 +35,7 @@ import org.apache.zookeeper.KeeperExcept
 public class PiEstimator {
 
   public static class MyEstimator extends BSP {
+    public static final Log LOG = LogFactory.getLog(MyEstimator.class);
     private Configuration conf;
     private static final int iterations = 10000;
     
@@ -53,18 +56,22 @@ public class PiEstimator {
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 
+      LOG.info("Sent message to localhost:30000: " + Bytes.toDouble(myData));
       bspPeer.send(new InetSocketAddress("localhost", 30000), estimate);
+      LOG.info("Enter the barrier");
       bspPeer.sync();
 
       double pi = 0.0;
       BSPMessage received;
       while ((received = bspPeer.getCurrentMessage()) != null) {
+        LOG.info("Receives messages:" + Bytes.toDouble(received.getData()));
         pi = (pi + Bytes.toDouble(received.getData())) / 2;
       }
 
-      if (pi != 0.0)
+      if (pi != 0.0) {
+        LOG.info("\nEstimated value of PI is " + pi);
         System.out.println("\nEstimated value of PI is " + pi);
-
+      }
     }
 
     @Override
@@ -84,14 +91,15 @@ public class PiEstimator {
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
     // Execute locally
-    conf.set("bsp.master.address", "local");
+    //conf.set("bsp.master.address", "local");
+    conf.set("bsp.master.address", "localhost:40000");
     
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
     // Set the job name
     bsp.setJobName("pi estimation example");
     bsp.setBspClass(MyEstimator.class);
 
-    bsp.setNumBspTask(10);
+    bsp.setNumBspTask(1);
     BSPJobClient.runJob(bsp);
   }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Mon Aug  2 11:18:20 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
  */
 public abstract class BSP extends Thread implements BSPInterface {
   private static final Log LOG = LogFactory.getLog(BSP.class);
+  private BSPPeer bspPeer;
   
   /**
    * A thread's run method.
@@ -34,9 +35,13 @@ public abstract class BSP extends Thread
    */
   public void run() {
     try {
-      bsp(new BSPPeer(this.getConf()));
+      bsp(bspPeer);
     } catch (Exception e) {
       LOG.error(e);
     }
   }
+
+  public void setPeer(BSPPeer bspServer) {
+    this.bspPeer = bspServer;
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Aug  2 11:18:20 2010
@@ -156,7 +156,7 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    LOG.debug("[" + serverName + "] enter the enterbarrier");
+    LOG.info("[" + serverName + "] enter the enterbarrier");
     try {
       zk.create(bspRoot + "/" + serverName, new byte[0], Ids.OPEN_ACL_UNSAFE,
           CreateMode.EPHEMERAL);
@@ -165,13 +165,17 @@ public class BSPPeer implements Watcher,
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
-
+    LOG.info("enterbarrier done 0");
     while (true) {
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
+        LOG.info(list.size() + ", " + conf.getInt("bsp.peers.num", 0));
+        
         if (list.size() < conf.getInt("bsp.peers.num", 0)) {
+          LOG.info("enterbarrier done 1");
           mutex.wait();
         } else {
+          LOG.info("enterbarrier done 2");
           return true;
         }
       }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRunner.java Mon Aug  2 11:18:20 2010
@@ -1,7 +1,5 @@
 package org.apache.hama.bsp;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -10,12 +8,11 @@ import org.apache.hadoop.util.Reflection
 
 public class BSPRunner extends Thread implements Configurable {
   private static final Log LOG = LogFactory.getLog(BSPRunner.class);
-  private BSPPeer bspPeer;
   private Configuration conf;
   private BSP bsp;
   private boolean isDone;
   
-  public void run() {
+  public void run(BSPPeer bspPeer) {
     try {
       bsp.bsp(bspPeer);
     } catch (Exception e) {
@@ -31,11 +28,6 @@ public class BSPRunner extends Thread im
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    try {
-      this.bspPeer = new BSPPeer(conf);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
 
     bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
         BSP.class), conf);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Mon Aug  2 11:18:20 2010
@@ -4,14 +4,22 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class BSPTask extends Task {
+  private BSP bsp;
+  private Configuration conf;
   
   public BSPTask(String jobId, String jobFile, String taskid, int partition, Configuration conf) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;
     this.partition = partition;
-    this.runner = (BSPRunner) ReflectionUtils.newInstance(
-        BSPRunner.class, conf);
+    this.conf = conf;
+  }
+
+  public BSP getBSPClass() {
+    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
+        BSP.class), conf);
+    
+    return bsp;
   }
 
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Mon Aug  2 11:18:20 2010
@@ -40,13 +40,17 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.InterTrackerProtocol;
 
 public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
+  private BSPPeer bspPeer;
+  private Task task;
   
   static {
     Configuration.addDefaultResource("hama-default.xml");
@@ -92,9 +96,14 @@ public class GroomServer implements Runn
     new LinkedBlockingQueue<GroomServerAction>();
   
   public GroomServer(Configuration conf) throws IOException {
+    LOG.info("groom start");
     this.conf = conf;
-    bspMasterAddr = BSPMaster.getAddress(conf);
-
+    String mode = conf.get("bsp.master.address");
+    if(!mode.equals("local")) {
+      bspMasterAddr = BSPMaster.getAddress(conf);
+    }
+    bspPeer = new BSPPeer(conf);
+    
     //FileSystem local = FileSystem.getLocal(conf);
     //this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
   }
@@ -214,7 +223,6 @@ public class GroomServer implements Runn
             heartbeatResponse.getResponseId() + " and " + 
             ((actions != null) ? actions.length : 0) + " actions");
 
-        
         if (actions != null){ 
           for(GroomServerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
@@ -260,9 +268,10 @@ public class GroomServer implements Runn
 
   private void startNewTask(LaunchTaskAction action) {
     // TODO Auto-generated method stub
-    Task t = action.getTask();
-    LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
-    LOG.info(t.runner);
+    task = action.getTask();
+    this.launchTask();
+    //LOG.info("GroomServer: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+    //LOG.info(t.runner);
     //t.runner.start();
     // TODO: execute task
     
@@ -352,7 +361,7 @@ public class GroomServer implements Runn
 
   public synchronized void close() throws IOException {
     this.running = false;
-
+    bspPeer.close();
     cleanupStorage();
 
     // shutdown RPC connections
@@ -450,6 +459,7 @@ public class GroomServer implements Runn
       conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-default.xml"));
       conf.addResource(new Path("/home/edward/workspace/hama-trunk/conf/hama-site.xml"));
       
+      conf.set(Constants.PEER_PORT, String.valueOf(30000));
       GroomServer groom = GroomServer.constructGroomServer(GroomServer.class, conf);
       startGroomServer(groom);
     } catch (Throwable e) {
@@ -457,5 +467,30 @@ public class GroomServer implements Runn
       System.exit(-1);
     }
   }
-  
+
+  public void assignTask(Task task) {
+    this.task = task;
+  }
+
+  public void launchTask() {
+    Configuration jobConf = new Configuration();
+    jobConf.addResource(new Path(task.getJobFile().replace("file:", "")));
+    BSP bsp = (BSP) ReflectionUtils.newInstance(jobConf.getClass("bsp.work.class",
+        BSP.class), conf);
+    bsp.setPeer(bspPeer);
+    bsp.start();
+    while(!bsp.isAlive()) {
+      LOG.info("i'm done. ");
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public String getServerName() {
+    return bspPeer.getServerName();
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Mon Aug  2 11:18:20 2010
@@ -141,7 +141,7 @@ class JobInProgress {
     try {
       result = new TaskInProgress(getJobID(), this.jobFile.toString(), this.master, null, this,
           numUniqueHosts).getTaskToRun(status);
-      LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
+      //LOG.info("JobInProgress: " + result.getJobID() + ", " + result.getJobFile() + ", " + result.getId() + ", " + result.getPartition());
       
     } catch (IOException e) {
       e.printStackTrace();

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Mon Aug  2 11:18:20 2010
@@ -2,7 +2,6 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -12,8 +11,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.InterTrackerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 public class LocalJobRunner implements JobSubmissionProtocol {
   private static final Log LOG = LogFactory.getLog(BSPJobClient.class);
@@ -106,7 +110,6 @@ public class LocalJobRunner implements J
     private BSPJob job;
     private String jobFile;
     private boolean threadDone = false;
-    private HashMap<String, Task> tasks = new HashMap<String, Task>();
 
     public Job(BSPJobID jobID, String jobFile, Configuration conf)
         throws IOException {
@@ -118,6 +121,26 @@ public class LocalJobRunner implements J
       LOG.info("Number of BSP tasks: " + NUM_PEER);
       jobs.put(jobID.toString(), this);
 
+      ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+      Stat s = null;
+      if (zk != null) {
+        try {
+          s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+        } catch (Exception e) {
+          LOG.error(s);
+        }
+
+        if (s == null) {
+          try {
+            zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+          } catch (KeeperException e) {
+            LOG.error(e);
+          } catch (InterruptedException e) {
+            LOG.error(e);
+          }
+        }
+      }      
       this.start();
     }
 
@@ -125,31 +148,23 @@ public class LocalJobRunner implements J
       while (!threadDone) {
         TaskID tID;
         for (int i = 0; i < NUM_PEER; i++) {
-          // TODO: this code should be automatically settled by BSP system  
           this.conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
           this.conf.setInt(Constants.PEER_ID, i);
           // Task ID is an integer that ranges from 0 to NUM_PEER - 1.
           tID = new TaskID(job.getJobID(), false, i);
 
-          // bspRunner should be Runnable.
-          Task bspRunner = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
-          LOG.info("Adding task '" + tID.toString() + "' for '" + bspRunner.getName() + "'");
-          tasks.put(tID.toString(), bspRunner);
-        }
-
-        // Launching tasks
-        for (Map.Entry<String, Task> e : tasks.entrySet()) {
-          e.getValue().runner.start();
-        }
-
-        // Slave Join
-        for (Map.Entry<String, Task> e : tasks.entrySet()) {
           try {
-            e.getValue().join();
-          } catch (InterruptedException e1) {
-            e1.printStackTrace();
+            GroomServer servers = new GroomServer(conf);
+            Task task = new BSPTask(job.getJobID().getJtIdentifier(), jobFile, tID.toString(), i, this.conf);
+            servers.assignTask(task);
+            LOG.info("Adding task '" + tID.toString() + "' for '" + servers.getServerName() + "'");
+
+            servers.launchTask();
+          } catch (IOException e) {
+            e.printStackTrace();
           }
         }
+
         done();
       }
     }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Aug  2 11:18:20 2010
@@ -82,7 +82,7 @@ class SimpleTaskScheduler extends TaskSc
           t = job.obtainNewTask(groomStatus, numGroomServers,
               groomServerManager.getNumberOfUniqueHosts());
           
-          LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
+          //LOG.info("SimpleTaskScheduler: " + t.getJobID() + ", " + t.getJobFile() + ", " + t.getId() + ", " + t.getPartition());
           
           if (t != null) {
             assignedTasks.add(t);

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=981457&r1=981456&r2=981457&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Mon Aug  2 11:18:20 2010
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.Writable;
 /**
  *
  */
-public class Task extends Thread implements Writable {
+public class Task implements Writable {
   public static final Log LOG = LogFactory.getLog(Task.class);
   ////////////////////////////////////////////
   // Fields
@@ -41,11 +41,8 @@ public class Task extends Thread impleme
   protected String taskId;
   protected int partition;
   
-  protected BSPRunner runner;
   protected LocalDirAllocator lDirAlloc;
-  /**
-   * 
-   */
+
   public Task() {
     taskId = new String();
   }
@@ -54,7 +51,6 @@ public class Task extends Thread impleme
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskId;
-     
     this.partition = partition;
   }