You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/12 04:03:58 UTC
svn commit: r1182172 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
examples/src/main/java/org/apache/hama/examples/
examples/src/main/java/org/apache/hama/examples/graph/
Author: edwardyoon
Date: Wed Oct 12 02:03:57 2011
New Revision: 1182172
URL: http://svn.apache.org/viewvc?rev=1182172&view=rev
Log:
Restructure BSP API
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 12 02:03:57 2011
@@ -17,7 +17,8 @@ Release 0.4 - Unreleased
HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
IMPROVEMENTS
-
+
+ HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)
HAMA-441: Logging tasks to distinct files (Thomas Jungblut)
HAMA-423: Improve and Refactor Partitioning in the Examples (Thomas Jungblut via edwardyoon)
HAMA-422: Update HttpServer to use QueuedThreadPool (edwardyoon)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Wed Oct 12 02:03:57 2011
@@ -17,8 +17,72 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+
/**
* This class provides an abstract implementation of the BSP interface.
*/
public abstract class BSP implements BSPInterface {
+
+ protected Configuration conf;
+
+ /**
+ * This method is called before the BSP method. It can be used for setup
+ * purposes.
+ *
+ * @param peer Your BSPPeer instance.
+ */
+ public void setup(BSPPeer peer) {
+
+ }
+
+ /**
+ * This method is called after the BSP method. It can be used for cleanup
+ * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+ * case of exceptions.
+ *
+ * @param peer Your BSPPeer instance.
+ */
+ public void cleanup(BSPPeer peer) {
+
+ }
+
+ /**
+ * This method is your computation method, the main work of your BSP should be
+ * done here.
+ *
+ * @param peer Your BSPPeer instance.
+ */
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
+
+ }
+
+ /**
+ * Returns the configuration of this BSP Job.
+ *
+ * @deprecated Use BSPPeer.getConfiguration() instead. Will be removed in
+ * 0.5.0.
+ */
+ @Deprecated
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Sets the configuration of this BSP Job.
+ *
+ * @deprecated Won't be used anymore.
+ */
+ @Deprecated
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Wed Oct 12 02:03:57 2011
@@ -696,6 +696,15 @@ public class BSPPeer implements Watcher,
}
/**
+ * Gets the job configuration.
+ *
+ * @return the conf
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
* Clears local queue
*/
public void clearLocalQueue() {
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Wed Oct 12 02:03:57 2011
@@ -26,18 +26,19 @@ import org.apache.hama.ipc.BSPPeerProtoc
import org.apache.zookeeper.KeeperException;
/**
- * Base class for tasks.
+ * Base class for tasks.
*/
public class BSPTask extends Task {
-
+
public static final Log LOG = LogFactory.getLog(BSPTask.class);
-
+
private BSPJob conf;
-
+
public BSPTask() {
}
- public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) {
+ public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid,
+ int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.taskId = taskid;
@@ -52,9 +53,11 @@ public class BSPTask extends Task {
@Override
public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
throws IOException {
-
- BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
- "bsp.work.class", BSP.class), job.getConf());
+
+ BSP bsp = (BSP) ReflectionUtils.newInstance(
+ job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+
+ bsp.setup(bspPeer);
try {
bsp.bsp(bspPeer);
@@ -64,17 +67,19 @@ public class BSPTask extends Task {
LOG.error("Exception during BSP execution!", e);
} catch (InterruptedException e) {
LOG.error("Exception during BSP execution!", e);
+ } finally {
+ bsp.cleanup(bspPeer);
}
done(umbilical);
}
-
+
public BSPJob getConf() {
- return conf;
- }
-
- public void setConf(BSPJob conf) {
- this.conf = conf;
- }
+ return conf;
+ }
+
+ public void setConf(BSPJob conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Wed Oct 12 02:03:57 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
@@ -42,13 +41,18 @@ public class PiEstimator {
public static class MyEstimator extends BSP {
public static final Log LOG = LogFactory.getLog(MyEstimator.class);
- private Configuration conf;
private String masterTask;
private static final int iterations = 10000;
- public void bsp(BSPPeer bspPeer) throws IOException,
- KeeperException, InterruptedException {
-
+ @Override
+ public void setup(BSPPeer peer) {
+ this.masterTask = conf.get(MASTER_TASK);
+ }
+
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
+
int in = 0, out = 0;
for (int i = 0; i < iterations; i++) {
double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
@@ -87,16 +91,6 @@ public class PiEstimator {
writer.append(new DoubleWritable(pi), new DoubleWritable(0));
writer.close();
}
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- this.masterTask = conf.get(MASTER_TASK);
- }
-
}
private static void initTempDir(FileSystem fileSys) throws IOException {
@@ -139,7 +133,7 @@ public class PiEstimator {
// Choose one as a master
for (String hostName : cluster.getActiveGroomNames().keySet()) {
- conf.set(MASTER_TASK,hostName);
+ conf.set(MASTER_TASK, hostName);
break;
}
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Wed Oct 12 02:03:57 2011
@@ -22,7 +22,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
@@ -41,15 +40,21 @@ public class RandBench {
public static class RandBSP extends BSP {
public static final Log LOG = LogFactory.getLog(RandBSP.class);
- private Configuration conf;
private Random r = new Random();
private int sizeOfMsg;
private int nCommunications;
private int nSupersteps;
@Override
- public void bsp(BSPPeer bspPeer) throws IOException,
- KeeperException, InterruptedException {
+ public void setup(BSPPeer peer) {
+ this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
+ this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
+ this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
+ }
+
+ @Override
+ public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+ InterruptedException {
byte[] dummyData = new byte[sizeOfMsg];
BSPMessage msg = null;
String[] peers = bspPeer.getAllPeerNames();
@@ -74,20 +79,6 @@ public class RandBench {
}
}
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1);
- this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1);
- this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
}
public static void main(String[] args) throws Exception {
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Wed Oct 12 02:03:57 2011
@@ -22,7 +22,6 @@ import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -42,11 +41,21 @@ public class SerializePrinting {
public static class HelloBSP extends BSP {
public static final Log LOG = LogFactory.getLog(HelloBSP.class);
- private Configuration conf;
private final static int PRINT_INTERVAL = 1000;
private FileSystem fileSys;
private int num;
+ @Override
+ public void setup(BSPPeer peer) {
+ num = Integer.parseInt(conf.get("bsp.peers.num"));
+ try {
+ fileSys = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new Error("Filesystem could not be initialized! ", e);
+ }
+ }
+
+ @Override
public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
InterruptedException {
@@ -72,21 +81,6 @@ public class SerializePrinting {
"Hello BSP from " + (i + 1) + " of " + num + ": " + string));
writer.close();
}
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- num = Integer.parseInt(conf.get("bsp.peers.num"));
- try {
- fileSys = FileSystem.get(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
}
private static void printOutput(FileSystem fileSys, ClusterStatus cluster,
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java Wed Oct 12 02:03:57 2011
@@ -39,8 +39,6 @@ import org.apache.zookeeper.KeeperExcept
public class PageRank extends PageRankBase {
public static final Log LOG = LogFactory.getLog(PageRank.class);
- private Configuration conf;
-
private final HashMap<Vertex, List<Vertex>> adjacencyList = new HashMap<Vertex, List<Vertex>>();
private final HashMap<String, Vertex> lookupMap = new HashMap<String, Vertex>();
private final HashMap<Vertex, Double> tentativePagerank = new HashMap<Vertex, Double>();
@@ -49,11 +47,22 @@ public class PageRank extends PageRankBa
private String[] peerNames;
@Override
+ public void setup(BSPPeer peer) {
+ Configuration conf = peer.getConfiguration();
+ numOfVertices = Integer.parseInt(conf.get("num.vertices"));
+ DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
+ ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
+ EPSILON = Double.parseDouble(conf.get("epsilon.error"));
+ MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
+ peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+ }
+
+ @Override
public void bsp(BSPPeer peer) throws IOException, KeeperException,
InterruptedException {
- String master = conf.get(MASTER_TASK);
+ String master = peer.getConfiguration().get(MASTER_TASK);
// setup the datasets
- PageRankBase.mapAdjacencyList(getConf(), peer, adjacencyList,
+ PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
tentativePagerank, lookupMap);
// while the error not converges against epsilon do the pagerank stuff
@@ -103,7 +112,8 @@ public class PageRank extends PageRankBa
// Clears all queues entries.
peer.clear();
// finally save the chunk of pageranks
- PageRankBase.savePageRankMap(peer, conf, lastTentativePagerank);
+ PageRankBase.savePageRankMap(peer, peer.getConfiguration(),
+ lastTentativePagerank);
}
private double broadcastError(BSPPeer peer, String master, double error)
@@ -157,22 +167,6 @@ public class PageRank extends PageRankBa
}
}
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- numOfVertices = Integer.parseInt(conf.get("num.vertices"));
- DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
- ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
- EPSILON = Double.parseDouble(conf.get("epsilon.error"));
- MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
- peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
public static void printUsage() {
System.out.println("PageRank Example:");
System.out
Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1182172&r1=1182171&r2=1182172&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Wed Oct 12 02:03:57 2011
@@ -41,9 +41,9 @@ import org.apache.hama.examples.RandBenc
import org.apache.zookeeper.KeeperException;
public class ShortestPaths extends ShortestPathsBase {
+
public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
- private Configuration conf;
private final HashMap<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
private String[] peerNames;
@@ -52,15 +52,16 @@ public class ShortestPaths extends Short
public void bsp(BSPPeer peer) throws IOException, KeeperException,
InterruptedException {
// map our input into ram
- mapAdjacencyList(conf, peer, adjacencyList, vertexLookupMap);
+ mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
+ vertexLookupMap);
// parse the configuration to get the peerNames
- parsePeerNames(conf);
+ parsePeerNames(peer.getConfiguration());
// get our master groom
- String master = conf.get(MASTER_TASK);
+ String master = peer.getConfiguration().get(MASTER_TASK);
// initial message bypass
- ShortestPathVertex v = vertexLookupMap.get(conf
- .get(SHORTEST_PATHS_START_VERTEX_ID));
+ ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get(
+ SHORTEST_PATHS_START_VERTEX_ID));
if (v != null) {
v.setCost(0);
sendMessageToNeighbors(peer, v);
@@ -90,7 +91,7 @@ public class ShortestPaths extends Short
}
}
// finished, finally save our map to DFS.
- saveVertexMap(conf, peer, adjacencyList);
+ saveVertexMap(peer.getConfiguration(), peer, adjacencyList);
}
/**
@@ -161,16 +162,6 @@ public class ShortestPaths extends Short
}
}
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
public static void printUsage() {
System.out.println("Single Source Shortest Path Example:");
System.out