You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/09/29 14:00:46 UTC
svn commit: r1177275 - in /incubator/hama/branches/HamaV2: api/
api/src/main/java/org/apache/hama/bsp/ client/ core/
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/checkpoint/
core/src/test/java/org/apache/hama/bsp/ core/src...
Author: tjungblut
Date: Thu Sep 29 12:00:45 2011
New Revision: 1177275
URL: http://svn.apache.org/viewvc?rev=1177275&view=rev
Log:
Patching for directory changes.
Added:
incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (with props)
Modified:
incubator/hama/branches/HamaV2/api/pom.xml
incubator/hama/branches/HamaV2/api/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/branches/HamaV2/client/pom.xml
incubator/hama/branches/HamaV2/core/pom.xml
incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
incubator/hama/branches/HamaV2/examples/.classpath
incubator/hama/branches/HamaV2/examples/.project
incubator/hama/branches/HamaV2/examples/pom.xml
incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
incubator/hama/branches/HamaV2/graph/.classpath
incubator/hama/branches/HamaV2/graph/.project
Modified: incubator/hama/branches/HamaV2/api/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/api/pom.xml?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/api/pom.xml (original)
+++ incubator/hama/branches/HamaV2/api/pom.xml Thu Sep 29 12:00:45 2011
@@ -41,70 +41,10 @@
<dependencies>
<dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>servlet-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- <version>${ant.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant-launcher</artifactId>
- <version>${ant.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
Modified: incubator/hama/branches/HamaV2/api/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/api/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/api/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/branches/HamaV2/api/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Sep 29 12:00:45 2011
@@ -27,7 +27,7 @@ import org.apache.zookeeper.KeeperExcept
/**
* BSP communication interface.
*/
-public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable,
+public interface BSPPeer extends HamaRPCProtocolVersion, Closeable,
Constants {
/**
Modified: incubator/hama/branches/HamaV2/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/client/pom.xml?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/client/pom.xml (original)
+++ incubator/hama/branches/HamaV2/client/pom.xml Thu Sep 29 12:00:45 2011
@@ -41,72 +41,14 @@
<dependencies>
<dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-core</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>servlet-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- <version>${ant.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant-launcher</artifactId>
- <version>${ant.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-api</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
Modified: incubator/hama/branches/HamaV2/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/pom.xml?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/pom.xml (original)
+++ incubator/hama/branches/HamaV2/core/pom.xml Thu Sep 29 12:00:45 2011
@@ -41,6 +41,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1177275&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Sep 29 12:00:45 2011
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.Constants;
+import org.apache.hama.checkpoint.CheckpointRunner;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This class represents a BSP peer.
+ */
+public class BSPPeerImpl implements Watcher, BSPPeer {
+
+ public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
+
+ private final Configuration conf;
+ private BSPJob jobConf;
+
+ private volatile Server server = null;
+ private ZooKeeper zk = null;
+ private volatile Integer mutex = 0;
+
+ private final String bspRoot;
+ private final String quorumServers;
+
+ private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
+ private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+
+ private InetSocketAddress peerAddress;
+ private TaskStatus currentTaskStatus;
+
+ private TaskAttemptID taskid;
+ private BSPPeerProtocol umbilical;
+
+ private final BSPMessageSerializer messageSerializer;
+
+ public static final class BSPSerializableMessage implements Writable {
+ final AtomicReference<String> path = new AtomicReference<String>();
+ final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+ public BSPSerializableMessage() {
+ }
+
+ public BSPSerializableMessage(final String path,
+ final BSPMessageBundle bundle) {
+ if (null == path)
+ throw new NullPointerException("No path provided for checkpointing.");
+ if (null == bundle)
+ throw new NullPointerException("No data provided for checkpointing.");
+ this.path.set(path);
+ this.bundle.set(bundle);
+ }
+
+ public final String checkpointedPath() {
+ return this.path.get();
+ }
+
+ public final BSPMessageBundle messageBundle() {
+ return this.bundle.get();
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ out.writeUTF(this.path.get());
+ this.bundle.get().write(out);
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ this.path.set(in.readUTF());
+ BSPMessageBundle pack = new BSPMessageBundle();
+ pack.readFields(in);
+ this.bundle.set(pack);
+ }
+
+ }// serializable message
+
+ final class BSPMessageSerializer {
+ final Socket client;
+ final ScheduledExecutorService sched;
+
+ public BSPMessageSerializer(final int port) {
+ Socket tmp = null;
+ int cnt = 0;
+ do {
+ tmp = init(port);
+ cnt++;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread is interrupted.", ie);
+ Thread.currentThread().interrupt();
+ }
+ } while (null == tmp && 10 > cnt);
+ this.client = tmp;
+ if (null == this.client)
+ throw new NullPointerException("Client socket is null.");
+ this.sched = Executors.newScheduledThreadPool(conf.getInt(
+ "bsp.checkpoint.serializer_thread", 10));
+ LOG.info(BSPMessageSerializer.class.getName()
+ + " is ready to serialize message.");
+ }
+
+ private Socket init(final int port) {
+ Socket tmp = null;
+ try {
+ tmp = new Socket("localhost", port);
+ } catch (UnknownHostException uhe) {
+ LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+ } catch (IOException ioe) {
+ LOG.warn("Fail to create socket.", ioe);
+ }
+ return tmp;
+ }
+
+ void serialize(final BSPSerializableMessage tmp) throws IOException {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+ final DataOutput out = new DataOutputStream(client.getOutputStream());
+ this.sched.schedule(new Callable<Object>() {
+ public Object call() throws Exception {
+ tmp.write(out);
+ return null;
+ }
+ }, 0, SECONDS);
+ }
+
+ public void close() {
+ try {
+ this.client.close();
+ this.sched.shutdown();
+ } catch (IOException io) {
+ LOG.error("Fail to close client socket.", io);
+ }
+ }
+
+ }// message serializer
+
+ /**
+ * Protected default constructor for LocalBSPRunner.
+ */
+ protected BSPPeerImpl() {
+ bspRoot = null;
+ quorumServers = null;
+ messageSerializer = null;
+ conf = null;
+ }
+
+ /**
+ * BSPPeer Constructor.
+ *
+ * BSPPeer acts on behalf of clients performing bsp() tasks.
+ *
+ * @param conf is the configuration file containing bsp peer host, port, etc.
+ * @param umbilical is the bsp protocol used to contact its parent process.
+ * @param taskid is the id that current process holds.
+ */
+ public BSPPeerImpl(Configuration conf, TaskAttemptID taskid,
+ BSPPeerProtocol umbilical) throws IOException {
+ this.conf = conf;
+ this.taskid = taskid;
+ this.umbilical = umbilical;
+
+ String bindAddress = conf.get(Constants.PEER_HOST,
+ Constants.DEFAULT_PEER_HOST);
+ int bindPort = conf
+ .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+ bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+ Constants.DEFAULT_ZOOKEEPER_ROOT);
+ quorumServers = QuorumPeer.getZKQuorumServersString(conf);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Quorum " + quorumServers);
+ peerAddress = new InetSocketAddress(bindAddress, bindPort);
+ BSPMessageSerializer msgSerializer = null;
+ if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+ msgSerializer = new BSPMessageSerializer(conf.getInt(
+ "bsp.checkpoint.port",
+ Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+ }
+ this.messageSerializer = msgSerializer;
+ }
+
+ public void reinitialize() {
+ try {
+ if (LOG.isDebugEnabled())
+ LOG.debug("reinitialize(): " + getPeerName());
+ this.server = RPC.getServer(this, peerAddress.getHostName(),
+ peerAddress.getPort(), conf);
+ server.start();
+ LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+ + peerAddress.getPort());
+ } catch (IOException e) {
+ LOG.error("Fail to start RPC server!", e);
+ }
+
+ try {
+ this.zk = new ZooKeeper(quorumServers, conf.getInt(
+ Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ } catch (IOException e) {
+ LOG.error("Fail while reinitializing zookeeeper!", e);
+ }
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localQueue.poll();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+ * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+ */
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ if (peerName.equals(getPeerName())) {
+ LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+ localQueueForNextIteration.add(msg);
+ } else {
+ LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+ ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+ .get(targetPeerAddress);
+ if (queue == null) {
+ queue = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+ queue.add(msg);
+ outgoingQueues.put(targetPeerAddress, queue);
+ }
+ }
+
+ private String checkpointedPath() {
+ String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+ String ckptPath = backup + jobConf.getJobID().toString() + "/"
+ + getSuperstepCount() + "/" + this.taskid.toString();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Messages are to be saved to " + ckptPath);
+ return ckptPath;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+ */
+ @Override
+ public void sync() throws IOException, KeeperException, InterruptedException {
+ enterBarrier();
+ Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+ .entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+ .next();
+
+ BSPPeer peer = peers.get(entry.getKey());
+ if (peer == null) {
+ try {
+ peer = getBSPPeerConnection(entry.getKey());
+ } catch (NullPointerException ne) {
+ umbilical.fatalError(taskid, entry.getKey().getHostName()
+ + " doesn't exists.");
+ }
+ }
+ Iterable<BSPMessage> messages = entry.getValue();
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ for (BSPMessage message : messages) {
+ bundle.addMessage(message);
+ }
+
+ // checkpointing
+ if (null != this.messageSerializer) {
+ this.messageSerializer.serialize(new BSPSerializableMessage(
+ checkpointedPath(), bundle));
+ }
+
+ peer.put(bundle);
+ }
+
+ leaveBarrier();
+ currentTaskStatus.incrementSuperstepCount();
+ umbilical.statusUpdate(taskid, currentTaskStatus);
+
+ // Clear outgoing queues.
+ clearOutgoingQueues();
+
+ // Add non-processed messages from this iteration for the next's queue.
+ while (!localQueue.isEmpty()) {
+ BSPMessage message = localQueue.poll();
+ localQueueForNextIteration.add(message);
+ }
+ // Switch local queues.
+ localQueue = localQueueForNextIteration;
+ localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+
+ private void createZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.PERSISTENT);
+ }
+
+ private void createEphemeralZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.EPHEMERAL);
+ }
+
+ private void createZnode(final String path, final CreateMode mode)
+ throws KeeperException, InterruptedException {
+ synchronized (zk) {
+ Stat s = zk.exists(path, false);
+ if (null == s) {
+ try {
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch (KeeperException.NodeExistsException nee) {
+ LOG.warn("Ignore because znode may be already created at " + path,
+ nee);
+ }
+ }
+ }
+ }
+
+ private class BarrierWatcher implements Watcher {
+ private boolean complete = false;
+
+ boolean isComplete() {
+ return this.complete;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ this.complete = true;
+ synchronized (mutex) {
+ LOG.info(">>>>>>>>>>>>>>> at superstep " + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " is notified.");
+ /*
+ * try { Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+ * if(null != s) { zk.delete(pathToSuperstepZnode+"/ready", 0); } }
+ * catch(KeeperException.NoNodeException nne) {
+ * LOG.warn("Ignore because znode may be deleted.", nne); }
+ * catch(Exception e) { throw new RuntimeException(e); }
+ */
+ mutex.notifyAll();
+ }
+ }
+ }
+
+ protected boolean enterBarrier() throws KeeperException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+ + this.getSuperstepCount());
+ }
+
+ synchronized (zk) {
+ createZnode(bspRoot);
+ final String pathToJobIdZnode = bspRoot + "/"
+ + taskid.getJobID().toString();
+ createZnode(pathToJobIdZnode);
+ final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+ + getSuperstepCount();
+ createZnode(pathToSuperstepZnode);
+ BarrierWatcher barrierWatcher = new BarrierWatcher();
+ Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+ barrierWatcher);
+ zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ int size = znodes.size(); // may contains ready
+ boolean hasReady = znodes.contains("ready");
+ if (hasReady) {
+ size--;
+ }
+
+ LOG.debug("===> at superstep :" + getSuperstepCount()
+ + " current znode size: " + znodes.size() + " current znodes:"
+ + znodes);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+ + " is " + znodes.size() + ". Znodes include " + znodes);
+
+ if (size < jobConf.getNumBspTask()) {
+ LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
+ + " which task is waiting? " + taskid.toString()
+ + " stat is null? " + readyStat);
+ while (!barrierWatcher.isComplete()) {
+ if (!hasReady) {
+ synchronized (mutex) {
+ mutex.wait(1000);
+ }
+ }
+ }
+ LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
+ + " after waiting ..." + taskid.toString());
+ } else {
+ LOG.debug("---> at superstep: " + getSuperstepCount()
+ + " task that is creating /ready znode:" + taskid.toString());
+ createEphemeralZnode(pathToSuperstepZnode + "/ready");
+ }
+ }
+ return true;
+ }
+
+ protected boolean leaveBarrier() throws KeeperException, InterruptedException {
+ final String pathToSuperstepZnode = bspRoot + "/"
+ + taskid.getJobID().toString() + "/" + getSuperstepCount();
+ while (true) {
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ LOG.info("leaveBarrier() !!! checking znodes cotnains /ready node or not: at superstep:"
+ + getSuperstepCount() + " znode:" + znodes);
+ if (znodes.contains("ready")) {
+ znodes.remove("ready");
+ }
+ final int size = znodes.size();
+ LOG.info("leaveBarrier() at superstep:" + getSuperstepCount()
+ + " znode size: (" + size + ") znodes:" + znodes);
+ if (null == znodes || znodes.isEmpty())
+ return true;
+ if (1 == size) {
+ try {
+ zk.delete(getNodeName(), 0);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn(
+ "+++ (znode size is 1). Ignore because znode may disconnect.",
+ nne);
+ }
+ return true;
+ }
+ Collections.sort(znodes);
+
+ final String lowest = znodes.get(0);
+ final String highest = znodes.get(size - 1);
+
+ LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " lowest: " + lowest + " highest:"
+ + highest);
+ synchronized (mutex) {
+
+ if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
+ Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " highest notify lowest.");
+ mutex.notifyAll();
+ }
+ }
+ });
+
+ if (null != s) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for higest notify.");
+ mutex.wait();
+ }
+ } else {
+ Stat s1 = zk.exists(getNodeName(), false);
+
+ if (null != s1) {
+ LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " exists, so delete it.");
+ try {
+ zk.delete(getNodeName(), 0);
+ } catch (KeeperException.NoNodeException nne) {
+ LOG.warn("++++ Ignore because node may be dleted.", nne);
+ }
+ }
+
+ Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ LOG.debug("leaveBarrier() at superstep: "
+ + getSuperstepCount() + " taskid:" + taskid.toString()
+ + " lowest notify other nodes.");
+ mutex.notifyAll();
+ }
+ }
+ });
+ if (null != s2) {
+ LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
+ + " taskid:" + taskid.toString() + " wait for lowest notify.");
+ mutex.wait();
+ }
+ }
+ }
+ }
+ }
+
+ private String getNodeName() {
+ return bspRoot + "/" + taskid.getJobID().toString() + "/"
+ + getSuperstepCount() + "/" + taskid.toString();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ mutex.notify();
+ }
+ }
+
+ public void clear() {
+ this.localQueue.clear();
+ this.outgoingQueues.clear();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.clear();
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (server != null)
+ server.stop();
+ if (null != messageSerializer)
+ this.messageSerializer.close();
+ }
+
+ @Override
+ public void put(BSPMessage msg) throws IOException {
+ this.localQueueForNextIteration.add(msg);
+ }
+
+ @Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ for (BSPMessage message : messages.getMessages()) {
+ this.localQueueForNextIteration.add(message);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return BSPPeer.versionID;
+ }
+
+ protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
+ throws NullPointerException {
+ BSPPeer peer;
+ synchronized (this.peers) {
+ peer = peers.get(addr);
+
+ if (peer == null) {
+ try {
+ peer = (BSPPeer) RPC.getProxy(BSPPeer.class,
+ BSPPeer.versionID, addr, this.conf);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ this.peers.put(addr, peer);
+ }
+ }
+
+ return peer;
+ }
+
+ /**
+ * @return the string as host:port of this Peer
+ */
+ public String getPeerName() {
+ return peerAddress.getHostName() + ":" + peerAddress.getPort();
+ }
+
+ private InetSocketAddress getAddress(String peerName) {
+ String[] peerAddrParts = peerName.split(":");
+ if (peerAddrParts.length != 2) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Peername must consist of exactly ONE \":\"! Given peername was: "
+ + peerName);
+ }
+ return new InetSocketAddress(peerAddrParts[0],
+ Integer.parseInt(peerAddrParts[1]));
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ String[] result = null;
+ try {
+ result = zk.getChildren("/" + jobConf.getJobID().toString(), this)
+ .toArray(new String[0]);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return result;
+ }
+
+ /**
+ * @return the number of messages
+ */
+ public int getNumCurrentMessages() {
+ return localQueue.size();
+ }
+
+ /**
+ * Sets the current status
+ *
+ * @param currentTaskStatus
+ */
+ public void setCurrentTaskStatus(TaskStatus currentTaskStatus) {
+ this.currentTaskStatus = currentTaskStatus;
+ }
+
+ /**
+ * @return the count of current super-step
+ */
+ public long getSuperstepCount() {
+ return currentTaskStatus.getSuperstepCount();
+ }
+
+ /**
+ * Sets the job configuration
+ *
+ * @param jobConf
+ */
+ public void setJobConf(BSPJob jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ /**
+ * @return the size of local queue
+ */
+ public int getLocalQueueSize() {
+ return localQueue.size();
+ }
+
+ /**
+ * @return the size of outgoing queue
+ */
+ public int getOutgoingQueueSize() {
+ return outgoingQueues.size();
+ }
+
+ /**
+ * Clears local queue
+ */
+ public void clearLocalQueue() {
+ this.localQueue.clear();
+ }
+
+ /**
+ * Clears outgoing queues
+ */
+ public void clearOutgoingQueues() {
+ this.outgoingQueues.clear();
+ }
+}
Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Sep 29 12:00:45 2011
@@ -944,7 +944,7 @@ public class GroomServer implements Runn
}
defaultConf.setInt(Constants.PEER_PORT, peerPort);
- BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+ BSPPeerImpl bspPeer = new BSPPeerImpl(defaultConf, taskid, umbilical);
bspPeer.reinitialize();
bspPeer.setJobConf(job);
Modified: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Sep 29 12:00:45 2011
@@ -251,7 +251,7 @@ public class LocalBSPRunner implements J
}
- class LocalGroom extends BSPPeer {
+ class LocalGroom implements BSPPeer {
private long superStepCount = 0;
private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
// outgoing queue
Modified: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java (original)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java Thu Sep 29 12:00:45 2011
@@ -17,27 +17,26 @@
*/
package org.apache.hama.checkpoint;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.io.BufferedReader;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.IOException;
-
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.bsp.GroomServer.CheckpointerChild;
-import org.apache.hama.bsp.GroomServer;
-import static java.util.concurrent.TimeUnit.*;
public final class CheckpointRunner implements Callable {
Modified: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (original)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Thu Sep 29 12:00:45 2011
@@ -17,35 +17,31 @@
*/
package org.apache.hama.checkpoint;
-import java.io.BufferedReader;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
-import java.io.InputStreamReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
-import org.apache.hama.GroomServerRunner;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
/**
* This class is responsible for checkpointing messages to hdfs.
Modified: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (original)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Thu Sep 29 12:00:45 2011
@@ -20,14 +20,14 @@ package org.apache.hama.bsp;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
public final class BSPSerializerWrapper {
- private final BSPPeer.BSPMessageSerializer serializer;
+ private final BSPPeerImpl.BSPMessageSerializer serializer;
public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
- this.serializer = new BSPPeer(conf, null, null).new BSPMessageSerializer(
+ this.serializer = new BSPPeerImpl(conf, null, null).new BSPMessageSerializer(
conf.getInt("bsp.checkpoint.port", port));
}
Modified: incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (original)
+++ incubator/hama/branches/HamaV2/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Thu Sep 29 12:00:45 2011
@@ -33,7 +33,7 @@ import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPSerializerWrapper;
import org.apache.hama.bsp.DoubleMessage;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
public class TestCheckpoint extends TestCase {
Modified: incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java (original)
+++ incubator/hama/branches/HamaV2/core/src/test/java/testjar/ClassSerializePrinting.java Thu Sep 29 12:00:45 2011
@@ -42,6 +42,7 @@ public class ClassSerializePrinting {
private FileSystem fileSys;
private int num;
+ @Override
public void bsp(BSPPeer bspPeer) throws IOException,
KeeperException, InterruptedException {
@@ -67,10 +68,12 @@ public class ClassSerializePrinting {
writer.close();
}
+ @Override
public Configuration getConf() {
return conf;
}
+ @Override
public void setConf(Configuration conf) {
this.conf = conf;
num = Integer.parseInt(conf.get("bsp.peers.num"));
Modified: incubator/hama/branches/HamaV2/examples/.classpath
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/examples/.classpath?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/examples/.classpath (original)
+++ incubator/hama/branches/HamaV2/examples/.classpath Thu Sep 29 12:00:45 2011
@@ -19,6 +19,7 @@
<classpathentry kind="var" path="M2_REPO/geronimo-spec/geronimo-spec-jta/1.0.1B-rc4/geronimo-spec-jta-1.0.1B-rc4.jar"/>
<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-core/0.20.2/hadoop-core-0.20.2.jar"/>
<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-test/0.20.2/hadoop-test-0.20.2.jar"/>
+ <classpathentry kind="src" path="/hama-api"/>
<classpathentry kind="src" path="/hama-core"/>
<classpathentry kind="var" path="M2_REPO/hsqldb/hsqldb/1.8.0.10/hsqldb-1.8.0.10.jar"/>
<classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.12/jasper-compiler-5.5.12.jar"/>
Modified: incubator/hama/branches/HamaV2/examples/.project
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/examples/.project?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/examples/.project (original)
+++ incubator/hama/branches/HamaV2/examples/.project Thu Sep 29 12:00:45 2011
@@ -6,6 +6,7 @@
We consider ourselves not simply a group of projects sharing a server, but rather a community of developers
and users. NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
<projects>
+ <project>hama-api</project>
<project>hama-core</project>
</projects>
<buildSpec>
Modified: incubator/hama/branches/HamaV2/examples/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/examples/pom.xml?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/examples/pom.xml (original)
+++ incubator/hama/branches/HamaV2/examples/pom.xml Thu Sep 29 12:00:45 2011
@@ -37,6 +37,11 @@
<artifactId>hama-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hama</groupId>
+ <artifactId>hama-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<finalName>hama-examples-${project.version}</finalName>
Modified: incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Thu Sep 29 12:00:45 2011
@@ -46,6 +46,7 @@ public class PiEstimator {
private String masterTask;
private static final int iterations = 10000;
+ @Override
public void bsp(BSPPeer bspPeer) throws IOException,
KeeperException, InterruptedException {
@@ -88,10 +89,12 @@ public class PiEstimator {
writer.close();
}
+ @Override
public Configuration getConf() {
return conf;
}
+ @Override
public void setConf(Configuration conf) {
this.conf = conf;
this.masterTask = conf.get(MASTER_TASK);
Modified: incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java (original)
+++ incubator/hama/branches/HamaV2/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java Thu Sep 29 12:00:45 2011
@@ -47,6 +47,7 @@ public class SerializePrinting {
private FileSystem fileSys;
private int num;
+ @Override
public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
InterruptedException {
@@ -73,10 +74,12 @@ public class SerializePrinting {
writer.close();
}
+ @Override
public Configuration getConf() {
return conf;
}
+ @Override
public void setConf(Configuration conf) {
this.conf = conf;
num = Integer.parseInt(conf.get("bsp.peers.num"));
Modified: incubator/hama/branches/HamaV2/graph/.classpath
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/graph/.classpath?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/graph/.classpath (original)
+++ incubator/hama/branches/HamaV2/graph/.classpath Thu Sep 29 12:00:45 2011
@@ -22,6 +22,7 @@
<classpathentry kind="var" path="M2_REPO/geronimo-spec/geronimo-spec-jta/1.0.1B-rc4/geronimo-spec-jta-1.0.1B-rc4.jar"/>
<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-core/0.20.2/hadoop-core-0.20.2.jar"/>
<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-test/0.20.2/hadoop-test-0.20.2.jar"/>
+ <classpathentry kind="src" path="/hama-api"/>
<classpathentry kind="src" path="/hama-core"/>
<classpathentry kind="var" path="M2_REPO/hsqldb/hsqldb/1.8.0.10/hsqldb-1.8.0.10.jar"/>
<classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.12/jasper-compiler-5.5.12.jar"/>
Modified: incubator/hama/branches/HamaV2/graph/.project
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/graph/.project?rev=1177275&r1=1177274&r2=1177275&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/graph/.project (original)
+++ incubator/hama/branches/HamaV2/graph/.project Thu Sep 29 12:00:45 2011
@@ -6,6 +6,7 @@
We consider ourselves not simply a group of projects sharing a server, but rather a community of developers
and users. NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
<projects>
+ <project>hama-api</project>
<project>hama-core</project>
</projects>
<buildSpec>