You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/12 04:03:58 UTC

svn commit: r1182172 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/graph/

Author: edwardyoon
Date: Wed Oct 12 02:03:57 2011
New Revision: 1182172

URL: http://svn.apache.org/viewvc?rev=1182172&view=rev
Log:
Restructure BSP API

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 12 02:03:57 2011
@@ -17,7 +17,8 @@ Release 0.4 - Unreleased
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS
-
+   
+    HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)
     HAMA-441: Logging tasks to distinct files (Thomas Jungblut)
     HAMA-423: Improve and Refactor Partitioning in the Examples (Thomas Jungblut via edwardyoon)
     HAMA-422: Update HttpServer to use QueuedThreadPool (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Wed Oct 12 02:03:57 2011
@@ -17,8 +17,72 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+
 /**
  * This class provides an abstract implementation of the BSP interface.
  */
 public abstract class BSP implements BSPInterface {
+
+  protected Configuration conf;
+
+  /**
+   * This method is called before the BSP method. It can be used for setup
+   * purposes.
+   * 
+   * @param peer Your BSPPeer instance.
+   */
+  public void setup(BSPPeer peer) {
+
+  }
+
+  /**
+   * This method is called after the BSP method. It can be used for cleanup
+   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+   * case of exceptions.
+   * 
+   * @param peer Your BSPPeer instance.
+   */
+  public void cleanup(BSPPeer peer) {
+
+  }
+
+  /**
+   * This method is your computation method, the main work of your BSP should be
+   * done here.
+   * 
+   * @param peer Your BSPPeer instance.
+   */
+  @Override
+  public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+      InterruptedException {
+
+  }
+
+  /**
+   * Returns the configuration of this BSP Job.
+   * 
+   * @deprecated Use BSPPeer.getConfiguration() instead. Will be removed in
+   *             0.5.0.
+   */
+  @Deprecated
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration of this BSP Job.
+   * 
+   * @deprecated Won't be used anymore.
+   */
+  @Deprecated
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Wed Oct 12 02:03:57 2011
@@ -696,6 +696,15 @@ public class BSPPeer implements Watcher,
   }
 
   /**
+   * Gets the job configuration.
+   * 
+   * @return the conf
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
    * Clears local queue
    */
   public void clearLocalQueue() {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Wed Oct 12 02:03:57 2011
@@ -26,18 +26,19 @@ import org.apache.hama.ipc.BSPPeerProtoc
 import org.apache.zookeeper.KeeperException;
 
 /**
- * Base class for tasks. 
+ * Base class for tasks.
  */
 public class BSPTask extends Task {
-  
+
   public static final Log LOG = LogFactory.getLog(BSPTask.class);
-  
+
   private BSPJob conf;
-  
+
   public BSPTask() {
   }
 
-  public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
+  public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid,
+      int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;
@@ -52,9 +53,11 @@ public class BSPTask extends Task {
   @Override
   public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
       throws IOException {
-    
-    BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
-        "bsp.work.class", BSP.class), job.getConf());
+
+    BSP bsp = (BSP) ReflectionUtils.newInstance(
+        job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+
+    bsp.setup(bspPeer);
 
     try {
       bsp.bsp(bspPeer);
@@ -64,17 +67,19 @@ public class BSPTask extends Task {
       LOG.error("Exception during BSP execution!", e);
     } catch (InterruptedException e) {
       LOG.error("Exception during BSP execution!", e);
+    } finally {
+      bsp.cleanup(bspPeer);
     }
 
     done(umbilical);
   }
-  
+
   public BSPJob getConf() {
-      return conf;
-    }
-  
-    public void setConf(BSPJob conf) {
-      this.conf = conf;
-    }
+    return conf;
+  }
+
+  public void setConf(BSPJob conf) {
+    this.conf = conf;
+  }
 
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Wed Oct 12 02:03:57 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
@@ -42,13 +41,18 @@ public class PiEstimator {
 
   public static class MyEstimator extends BSP {
     public static final Log LOG = LogFactory.getLog(MyEstimator.class);
-    private Configuration conf;
     private String masterTask;
     private static final int iterations = 10000;
 
-    public void bsp(BSPPeer bspPeer) throws IOException,
-        KeeperException, InterruptedException {
-      
+    @Override
+    public void setup(BSPPeer peer) {
+      this.masterTask = conf.get(MASTER_TASK);
+    }
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
+
       int in = 0, out = 0;
       for (int i = 0; i < iterations; i++) {
         double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
@@ -87,16 +91,6 @@ public class PiEstimator {
       writer.append(new DoubleWritable(pi), new DoubleWritable(0));
       writer.close();
     }
-
-    public Configuration getConf() {
-      return conf;
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      this.masterTask = conf.get(MASTER_TASK);
-    }
-
   }
 
   private static void initTempDir(FileSystem fileSys) throws IOException {
@@ -139,7 +133,7 @@ public class PiEstimator {
 
     // Choose one as a master
     for (String hostName : cluster.getActiveGroomNames().keySet()) {
-      conf.set(MASTER_TASK,hostName);
+      conf.set(MASTER_TASK, hostName);
       break;
     }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Wed Oct 12 02:03:57 2011
@@ -22,7 +22,6 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
@@ -41,15 +40,21 @@ public class RandBench {
 
   public static class RandBSP extends BSP {
     public static final Log LOG = LogFactory.getLog(RandBSP.class);
-    private Configuration conf;
     private Random r = new Random();
     private int sizeOfMsg;
     private int nCommunications;
     private int nSupersteps;
 
     @Override
-    public void bsp(BSPPeer bspPeer) throws IOException,
-        KeeperException, InterruptedException {
+    public void setup(BSPPeer peer) {
+      this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
+      this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
+      this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
+    }
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
       byte[] dummyData = new byte[sizeOfMsg];
       BSPMessage msg = null;
       String[] peers = bspPeer.getAllPeerNames();
@@ -74,20 +79,6 @@ public class RandBench {
 
       }
     }
-
-    @Override
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
-      this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
-      this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
-    }
-
-    @Override
-    public Configuration getConf() {
-      return conf;
-    }
-
   }
 
   public static void main(String[] args) throws Exception {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Wed Oct 12 02:03:57 2011
@@ -22,7 +22,6 @@ import java.util.Date;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -42,11 +41,21 @@ public class SerializePrinting {
 
   public static class HelloBSP extends BSP {
     public static final Log LOG = LogFactory.getLog(HelloBSP.class);
-    private Configuration conf;
     private final static int PRINT_INTERVAL = 1000;
     private FileSystem fileSys;
     private int num;
 
+    @Override
+    public void setup(BSPPeer peer) {
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        throw new Error("Filesystem could not be initialized! ", e);
+      }
+    }
+
+    @Override
     public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
         InterruptedException {
 
@@ -72,21 +81,6 @@ public class SerializePrinting {
           "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
       writer.close();
     }
-
-    public Configuration getConf() {
-      return conf;
-    }
-
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      num = Integer.parseInt(conf.get("bsp.peers.num"));
-      try {
-        fileSys = FileSystem.get(conf);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
   }
 
   private static void printOutput(FileSystem fileSys, ClusterStatus cluster,

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java Wed Oct 12 02:03:57 2011
@@ -39,8 +39,6 @@ import org.apache.zookeeper.KeeperExcept
 public class PageRank extends PageRankBase {
   public static final Log LOG = LogFactory.getLog(PageRank.class);
 
-  private Configuration conf;
-
   private final HashMap<Vertex, List<Vertex>> adjacencyList = new HashMap<Vertex, List<Vertex>>();
   private final HashMap<String, Vertex> lookupMap = new HashMap<String, Vertex>();
   private final HashMap<Vertex, Double> tentativePagerank = new HashMap<Vertex, Double>();
@@ -49,11 +47,22 @@ public class PageRank extends PageRankBa
   private String[] peerNames;
 
   @Override
+  public void setup(BSPPeer peer) {
+    Configuration conf = peer.getConfiguration();
+    numOfVertices = Integer.parseInt(conf.get("num.vertices"));
+    DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
+    ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
+    EPSILON = Double.parseDouble(conf.get("epsilon.error"));
+    MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
+    peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+  }
+
+  @Override
   public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
-    String master = conf.get(MASTER_TASK);
+    String master = peer.getConfiguration().get(MASTER_TASK);
     // setup the datasets
-    PageRankBase.mapAdjacencyList(getConf(), peer, adjacencyList,
+    PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
         tentativePagerank, lookupMap);
 
     // while the error not converges against epsilon do the pagerank stuff
@@ -103,7 +112,8 @@ public class PageRank extends PageRankBa
     // Clears all queues entries.
     peer.clear();
     // finally save the chunk of pageranks
-    PageRankBase.savePageRankMap(peer, conf, lastTentativePagerank);
+    PageRankBase.savePageRankMap(peer, peer.getConfiguration(),
+        lastTentativePagerank);
   }
 
   private double broadcastError(BSPPeer peer, String master, double error)
@@ -157,22 +167,6 @@ public class PageRank extends PageRankBa
     }
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    numOfVertices = Integer.parseInt(conf.get("num.vertices"));
-    DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
-    ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
-    EPSILON = Double.parseDouble(conf.get("epsilon.error"));
-    MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
-    peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
   public static void printUsage() {
     System.out.println("PageRank Example:");
     System.out

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Wed Oct 12 02:03:57 2011
@@ -41,9 +41,9 @@ import org.apache.hama.examples.RandBenc
 import org.apache.zookeeper.KeeperException;
 
 public class ShortestPaths extends ShortestPathsBase {
+
   public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
 
-  private Configuration conf;
   private final HashMap<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
   private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
   private String[] peerNames;
@@ -52,15 +52,16 @@ public class ShortestPaths extends Short
   public void bsp(BSPPeer peer) throws IOException, KeeperException,
       InterruptedException {
     // map our input into ram
-    mapAdjacencyList(conf, peer, adjacencyList, vertexLookupMap);
+    mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
+        vertexLookupMap);
     // parse the configuration to get the peerNames
-    parsePeerNames(conf);
+    parsePeerNames(peer.getConfiguration());
     // get our master groom
-    String master = conf.get(MASTER_TASK);
+    String master = peer.getConfiguration().get(MASTER_TASK);
 
     // initial message bypass
-    ShortestPathVertex v = vertexLookupMap.get(conf
-        .get(SHORTEST_PATHS_START_VERTEX_ID));
+    ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get(
+        SHORTEST_PATHS_START_VERTEX_ID));
     if (v != null) {
       v.setCost(0);
       sendMessageToNeighbors(peer, v);
@@ -90,7 +91,7 @@ public class ShortestPaths extends Short
       }
     }
     // finished, finally save our map to DFS.
-    saveVertexMap(conf, peer, adjacencyList);
+    saveVertexMap(peer.getConfiguration(), peer, adjacencyList);
   }
 
   /**
@@ -161,16 +162,6 @@ public class ShortestPaths extends Short
     }
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
   public static void printUsage() {
     System.out.println("Single Source Shortest Path Example:");
     System.out