You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/10/12 03:27:29 UTC
svn commit: r1021571 - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Oct 12 01:27:28 2010
New Revision: 1021571
URL: http://svn.apache.org/viewvc?rev=1021571&view=rev
Log:
Allow BSP peers to get a list of all other peers executing tasks of the same job
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Oct 12 01:27:28 2010
@@ -4,7 +4,10 @@ Trunk (unreleased changes)
NEW FEATURES
- HAMA-267: Add command-line interface for job management (hyunsik via edwardyoon)
+ HAMA-304: Allow BSP peers to get a list of all other peers
+ executing tasks of the same job (Filipe Manana via edwardyoon)
+ HAMA-267: Add command-line interface for job management
+ (hyunsik via edwardyoon)
HAMA-279: Add "serialize printing" to examples (edwardyoon)
HAMA-272: Hama/Zookeeper Integration (edwardyoon)
HAMA-265: Add example Pi estimatior based on BSP (edwardyoon)
@@ -58,8 +61,8 @@ Trunk (unreleased changes)
HAMA-293: Add starter classes for ZooKeeper, BSPMaster
and GroomServer (Filipe Manana via edwardyoon)
HAMA-286: Task progress should be monitored (eddieyoon)
- HAMA-287: BSPMaster should use the bsp.master.port config property
- when creating its InetSocketAddr instance (Filipe Manana via edwardyoon)
+ HAMA-287: BSPMaster should use the bsp.master.port property when creating
+ its InetSocketAddr instance (Filipe Manana via edwardyoon)
HAMA-283: Removing duplicate code (Filipe Manana via edwardyoon)
HAMA-277: Add default number of bsp task (edwardyoon)
HAMA-273: Implement killJob() method for local job (edwardyoon)
@@ -81,7 +84,7 @@ Trunk (unreleased changes)
HAMA-217: Replacement of Block Multiplication Map/Reduce (edwardyoon)
HAMA-205: Replacement of NormMap/Reduce (edwardyoon)
HAMA-207: Replacement of Mat-Mat addition Map/Reduce (edwardyoon)
- HAMA-208: Replacement of vector-matrix multiplication Map/Reduce (edwardyoon)
+ HAMA-208: Replacement of vector-matrix multiplication MapReduce (edwardyoon)
HAMA-216: Removing JobManager in util package (edwardyoon)
HAMA-215: Removing hama shell from version 0.2 (edwardyoon)
HAMA-204: Replacement of TransposeMap/Reduce (edwardyoon)
@@ -174,10 +177,14 @@ Trunk (unreleased changes)
to groom servers (Filipe Manana via edwardyoon)
HAMA-280: Fix warnings (Filipe Manana via edwardyoon)
HAMA-270: wrong sequence of readFields() of ClusterStatus (hyunsik)
- HAMA-260: Current version command of script is linked unknown class (edwardyoon)
- HAMA-250: Add --no-check-certificate option to 'wget' command lines (edwardyoon)
- HAMA-242: Wrong assignment of default setting inside BSPPeer constructor (hyunsik)
- HAMA-246: Current shell scripts for bsp daemon have several problems. (hyunsik)
+ HAMA-260: Current version command of script is linked unknown class
+ (edwardyoon)
+ HAMA-250: Add --no-check-certificate option to 'wget' command lines
+ (edwardyoon)
+ HAMA-242: Wrong assignment of default setting inside BSPPeer constructor
+ (hyunsik)
+ HAMA-246: Current shell scripts for bsp daemon have several problems.
+ (hyunsik)
HAMA-238: Example fail when performing sparse matrices addition (edwardyoon)
HAMA-225: Jacobi iteration is in a infinite loop (edwardyoon)
HAMA-214: Can't run the examples (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Oct 12 01:27:28 2010
@@ -18,7 +18,6 @@
package org.apache.hama.examples;
import java.io.IOException;
-import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,15 +28,19 @@ import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.KeeperException;
public class PiEstimator {
+ private static String MASTER_TASK = "master.task.";
+
public static class MyEstimator extends BSP {
public static final Log LOG = LogFactory.getLog(MyEstimator.class);
private Configuration conf;
+ private String masterTask;
private static final int iterations = 10000;
-
+
@Override
public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
InterruptedException {
@@ -55,7 +58,7 @@ public class PiEstimator {
byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
BSPMessage estimate = new BSPMessage(tagName, myData);
- bspPeer.send(new InetSocketAddress("localhost", 61000), estimate);
+ bspPeer.send(bspPeer.getAddress(masterTask), estimate);
bspPeer.sync();
double pi = 0.0;
@@ -78,8 +81,9 @@ public class PiEstimator {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
+ this.masterTask = conf.get(MASTER_TASK);
}
-
+
}
public static void main(String[] args) throws InterruptedException,
@@ -87,13 +91,21 @@ public class PiEstimator {
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
// Execute locally
- //conf.set("bsp.master.address", "local");
+ // conf.set("bsp.master.address", "local");
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
// Set the job name
bsp.setJobName("pi estimation example");
bsp.setBspClass(MyEstimator.class);
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ ClusterStatus cluster = jobClient.getClusterStatus(true);
+ // Choose one as a master
+ for (String name : cluster.getActiveGroomNames()) {
+ conf.set(MASTER_TASK, name);
+ break;
+ }
+
BSPJobClient.runJob(bsp);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Tue Oct 12 01:27:28 2010
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -87,6 +88,8 @@ public class BSPMaster implements JobSub
// (groom name --> last sent HeartBeatResponse)
Map<String, HeartbeatResponse> groomToHeartbeatResponseMap = new TreeMap<String, HeartbeatResponse>();
private HashMap<String, GroomServerStatus> groomServers = new HashMap<String, GroomServerStatus>();
+ // maps groom server names to hosts (hostname:port)
+ private HashMap<String, String> groomServerHosts = new HashMap<String, String>();
// Jobs' Meta Data
private int nextJobId = 1;
@@ -359,10 +362,11 @@ public class BSPMaster implements JobSub
groomToHeartbeatResponseMap.remove(groomName);
}
return new HeartbeatResponse(newResponseId,
- new GroomServerAction[] { new ReinitTrackerAction() });
+ new GroomServerAction[] { new ReinitTrackerAction() },
+ Collections.<String, String>emptyMap());
}
- HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null, groomServerHosts);
List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
updateTaskStatuses(status);
@@ -509,6 +513,10 @@ public class BSPMaster implements JobSub
}
}
+ if (initialContact) {
+ groomServerHosts.put(groomStatus.getGroomName(), groomStatus.getHost());
+ }
+
return true;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Oct 12 01:27:28 2010
@@ -19,6 +19,7 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -49,7 +50,7 @@ public class BSPPeer implements Watcher,
protected Server server = null;
protected ZooKeeper zk = null;
protected volatile Integer mutex = 0;
-
+
protected final String serverName;
protected final String bindAddress;
protected final int bindPort;
@@ -60,6 +61,8 @@ public class BSPPeer implements Watcher,
protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
protected int id;
+ protected Map<String, String> allPeers = new HashMap<String, String>();
+ protected InetSocketAddress peerAddress;
/**
*
@@ -78,6 +81,7 @@ public class BSPPeer implements Watcher,
+ ":"
+ conf.getInt(Constants.ZOOKEPER_CLIENT_PORT,
Constants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ peerAddress = new InetSocketAddress(bindAddress, bindPort);
reinitialize();
}
@@ -96,7 +100,7 @@ public class BSPPeer implements Watcher,
} catch (IOException e) {
e.printStackTrace();
}
-
+
Stat s = null;
if (zk != null) {
try {
@@ -115,8 +119,8 @@ public class BSPPeer implements Watcher,
LOG.error(e);
}
}
- }
-
+ }
+
}
@Override
@@ -170,13 +174,13 @@ public class BSPPeer implements Watcher,
peer.put(messages.next());
}
}
-
+
// Should we clearing outgoingQueues?
this.outgoingQueues.clear();
-
+
enterBarrier();
Thread.sleep(Constants.ATLEAST_WAIT_TIME); // TODO - This is temporary work
- // because
+ // because
// it can be affected by network condition,
// the number of peers, and the load of zookeeper.
// It should fixed to some flawless way.
@@ -198,7 +202,7 @@ public class BSPPeer implements Watcher,
synchronized (mutex) {
List<String> list = zk.getChildren(bspRoot, true);
- // TODO it must be same with the number of slave nodes, at this time.
+ // TODO it must be same with the number of slave nodes, at this time.
if (list.size() < conf.getInt("bsp.peers.num", 0)) {
mutex.wait();
} else {
@@ -279,4 +283,35 @@ public class BSPPeer implements Watcher,
return this.id;
}
+ /**
+ * @return The address of this peer.
+ */
+ public InetSocketAddress getAddress() {
+ return peerAddress;
+ }
+
+ public InetSocketAddress getAddress(String hostname) {
+ String peerAddr = allPeers.get(hostname);
+ String[] peerAddrParts = peerAddr.split(":");
+ return new InetSocketAddress(peerAddrParts[0], Integer
+ .parseInt(peerAddrParts[1]));
+ }
+
+ /**
+ * @return all the other peers executing tasks from the same job.
+ */
+ public Map<String, String> getAllPeers() {
+ return allPeers;
+ }
+
+ /**
+ * To be invoked by the Groom Server with a list of peers received from an
+ * heartbeat response (BSPMaster).
+ *
+ * @param allPeers
+ */
+ void setPeers(Map<String, String> allPeers) {
+ this.allPeers = allPeers;
+ }
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Oct 12 01:27:28 2010
@@ -122,7 +122,10 @@ public class GroomServer implements Runn
this.runningTasks = new LinkedHashMap<String, TaskInProgress>();
this.acceptNewTasks = true;
- this.groomServerName = "groomd_" + localHostname;
+ this.conf.set(Constants.PEER_HOST, localHostname);
+ bspPeer = new BSPPeer(conf);
+
+ this.groomServerName = "groomd_" + bspPeer.getServerName().replace(':', '_');
LOG.info("Starting groom: " + this.groomServerName);
DistributedCache.purgeCache(this.conf);
@@ -131,9 +134,6 @@ public class GroomServer implements Runn
InterTrackerProtocol.class, InterTrackerProtocol.versionID,
bspMasterAddr, conf);
this.running = true;
-
- this.conf.set(Constants.PEER_HOST, localHostname);
- bspPeer = new BSPPeer(conf);
}
private static void checkLocalDirs(String[] localDirs)
@@ -214,6 +214,14 @@ public class GroomServer implements Runn
// Send the heartbeat and process the bspmaster's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+ if (acceptNewTasks) {
+ bspPeer.setPeers(heartbeatResponse.getGroomServers());
+ }
+
+ for (Map.Entry<String, String> peer : bspPeer.getAllPeers().entrySet()) {
+ LOG.debug("Remote peer, host:port is " + peer.getKey());
+ }
+
GroomServerAction[] actions = heartbeatResponse.getActions();
LOG.debug("Got heartbeatResponse from BSPMaster with responseId: "
+ heartbeatResponse.getResponseId() + " and "
@@ -285,7 +293,7 @@ public class GroomServer implements Runn
//
if (status == null) {
synchronized (this) {
- status = new GroomServerStatus(groomServerName, localHostname,
+ status = new GroomServerStatus(groomServerName, bspPeer.getServerName(),
cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks);
}
} else {
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Tue Oct 12 01:27:28 2010
@@ -70,6 +70,11 @@ public class GroomServerStatus implement
return groomName;
}
+ /**
+ * The host (and port) from where the groom server can be reached.
+ *
+ * @return The groom server address in the format hostname:port
+ */
public String getHost() {
return host;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java?rev=1021571&r1=1021570&r2=1021571&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java Tue Oct 12 01:27:28 2010
@@ -20,6 +20,8 @@ package org.apache.hama.bsp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -30,12 +32,15 @@ public class HeartbeatResponse implement
private Configuration conf;
short responseId;
private GroomServerAction [] actions;
+ private Map<String, String> groomServers;
public HeartbeatResponse() {}
- public HeartbeatResponse(short responseId, GroomServerAction [] actions) {
+ public HeartbeatResponse(short responseId, GroomServerAction [] actions,
+ Map<String, String> groomServers) {
this.responseId = responseId;
this.actions = actions;
+ this.groomServers = groomServers;
}
public void setResponseId(short responseId) {
@@ -54,6 +59,14 @@ public class HeartbeatResponse implement
return actions;
}
+ public void setGroomServers(Map<String, String> groomServers) {
+ this.groomServers = groomServers;
+ }
+
+ public Map<String, String> getGroomServers() {
+ return groomServers;
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
this.responseId = in.readShort();
@@ -69,6 +82,14 @@ public class HeartbeatResponse implement
} else {
actions = null;
}
+
+ String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+ String[] groomServerAddresses = WritableUtils.readCompressedStringArray(in);
+ groomServers = new HashMap<String, String>(groomServerNames.length);
+
+ for (int i = 0; i < groomServerNames.length; i++) {
+ groomServers.put(groomServerNames[i], groomServerAddresses[i]);
+ }
}
@Override
@@ -83,6 +104,10 @@ public class HeartbeatResponse implement
action.write(out);
}
}
+ String[] groomServerNames = groomServers.keySet().toArray(new String[0]);
+ WritableUtils.writeCompressedStringArray(out, groomServerNames);
+ String[] groomServerAddresses = groomServers.values().toArray(new String[0]);
+ WritableUtils.writeCompressedStringArray(out, groomServerAddresses);
}
@Override