You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/05 11:30:25 UTC
svn commit: r1088939 - in /incubator/hama: branches/0.2.1/ trunk/
trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/
trunk/src/examples/org/apache/hama/examples/
trunk/src/java/org/apache/hama/bsp/ trunk/src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Apr 5 09:30:25 2011
New Revision: 1088939
URL: http://svn.apache.org/viewvc?rev=1088939&view=rev
Log:
Re-design a new data structure of BSPMessage
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java
Modified:
incubator/hama/branches/0.2.1/CHANGES.txt
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml
incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
Modified: incubator/hama/branches/0.2.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/branches/0.2.1/CHANGES.txt?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/branches/0.2.1/CHANGES.txt (original)
+++ incubator/hama/branches/0.2.1/CHANGES.txt Tue Apr 5 09:30:25 2011
@@ -12,6 +12,8 @@ Release 0.2.1 - Unreleased
HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content
in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)
+Release 0.2 - Unreleased
+
NEW FEATURES
HAMA-339: Add job status command (edwardyoon)
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 5 09:30:25 2011
@@ -1,6 +1,28 @@
Hama Change Log
-Trunk (unreleased changes)
+Release 0.3 - Unreleased
+
+ NEW FEATURES
+
+ BUG FIXES
+
+ IMPROVEMENTS
+
+ HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)
+
+Release 0.2.1 - Unreleased
+
+ NEW FEATURES
+
+ BUG FIXES
+
+ IMPROVEMENTS
+
+ HAMA-369: Reduce overhead of local communications (Miklos Erdelyi via edwardyoon)
+ HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content
+ in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)
+
+Release 0.2 - Unreleased
NEW FEATURES
@@ -52,8 +74,6 @@ Trunk (unreleased changes)
IMPROVEMENTS
- HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content
- in conf files or not when building artifacts (ChiaHung Lin)
HAMA-369: Reduce overhead of local communications (Miklos Erdelyi)
HAMA-366: Remove unnecessary dependencies (Tommaso Teofili)
HAMA-365: TestBSPPeer doesn't check if the correct
Modified: incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml (original)
+++ incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml Tue Apr 5 09:30:25 2011
@@ -28,7 +28,7 @@
<section>
<title>Downloadable User Guide</title>
<ul>
- <li>Apache Hama (v0.2) : User Guide [<a href="/docs/r0.2.0/ApacheHama-0.2_UserGuide.pdf">pdf</a>]</li>
+ <li>Apache Hama (v0.2) : User Guide [<a href="/hama/docs/r0.2.0/ApacheHama-0.2_UserGuide.pdf">pdf</a>]</li>
</ul>
</section>
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Apr 5 09:30:25 2011
@@ -31,10 +31,9 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPPeerProtocol;
import org.apache.hama.bsp.ClusterStatus;
-import org.apache.hama.util.Bytes;
+import org.apache.hama.bsp.DoubleMessage;
import org.apache.zookeeper.KeeperException;
public class PiEstimator {
@@ -59,9 +58,8 @@ public class PiEstimator {
}
}
- byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
- byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
- BSPMessage estimate = new BSPMessage(tagName, myData);
+ double data = 4.0 * (double) in / (double) iterations;
+ DoubleMessage estimate = new DoubleMessage(bspPeer.getPeerName(), data);
bspPeer.send(masterTask, estimate);
bspPeer.sync();
@@ -69,9 +67,9 @@ public class PiEstimator {
if (bspPeer.getPeerName().equals(masterTask)) {
double pi = 0.0;
int numPeers = bspPeer.getNumCurrentMessages();
- BSPMessage received;
- while ((received = bspPeer.getCurrentMessage()) != null) {
- pi += Bytes.toDouble(received.getData());
+ DoubleMessage received;
+ while ((received = (DoubleMessage) bspPeer.getCurrentMessage()) != null) {
+ pi += received.getData();
}
pi = pi / numPeers;
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Tue Apr 5 09:30:25 2011
@@ -29,6 +29,7 @@ import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPMessage;
import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ByteMessage;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.util.Bytes;
import org.apache.zookeeper.KeeperException;
@@ -59,17 +60,18 @@ public class RandBench {
for (int j = 0; j < nCommunications; j++) {
String tPeer = peers[r.nextInt(peers.length)];
String tag = peerName + " to " + tPeer;
- msg = new BSPMessage(Bytes.toBytes(tag), dummyData);
+ msg = new ByteMessage(Bytes.toBytes(tag), dummyData);
bspPeer.send(tPeer, msg);
}
bspPeer.sync();
- BSPMessage received;
- while ((received = bspPeer.getCurrentMessage()) != null) {
- LOG.info(Bytes.toString(received.getTag()) + " : " + received.getData().length);
+ ByteMessage received;
+ while ((received = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
+ LOG.info(Bytes.toString(received.getTag()) + " : "
+ + received.getData().length);
}
-
+
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Tue Apr 5 09:30:25 2011
@@ -17,35 +17,13 @@
*/
package org.apache.hama.bsp;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
import org.apache.hadoop.io.Writable;
/**
* BSPMessage consists of the tag and the arbitrary amount of data to be
* communicated.
*/
-public class BSPMessage implements Writable {
- protected byte[] tag;
- protected byte[] data;
-
- public BSPMessage() {
- }
-
- /**
- * Constructor
- *
- * @param tag of data
- * @param data of message
- */
- public BSPMessage(byte[] tag, byte[] data) {
- this.tag = new byte[tag.length];
- this.data = new byte[data.length];
- System.arraycopy(tag, 0, this.tag, 0, tag.length);
- System.arraycopy(data, 0, this.data, 0, data.length);
- }
+public abstract class BSPMessage implements Messagable, Writable {
/**
* BSP messages are typically identified with tags. This allows to get the tag
@@ -53,32 +31,11 @@ public class BSPMessage implements Writa
*
* @return tag of data of BSP message
*/
- public byte[] getTag() {
- byte[] result = this.tag;
- return result;
- }
+ public abstract Object getTag();
/**
* @return data of BSP message
*/
- public byte[] getData() {
- byte[] result = this.data;
- return result;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.tag = new byte[in.readInt()];
- in.readFully(tag, 0, this.tag.length);
- this.data = new byte[in.readInt()];
- in.readFully(data, 0, this.data.length);
- }
+ public abstract Object getData();
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(tag.length);
- out.write(tag);
- out.writeInt(data.length);
- out.write(data);
- }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 5 09:30:25 2011
@@ -44,7 +44,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
- * This class represents a BSP peer.
+ * This class represents a BSP peer.
*/
public class BSPPeer implements Watcher, BSPPeerInterface {
public static final Log LOG = LogFactory.getLog(BSPPeer.class);
@@ -62,10 +62,8 @@ public class BSPPeer implements Watcher,
private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
- private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration =
- new ConcurrentLinkedQueue<BSPMessage>();
- private final Map<String, InetSocketAddress> peerSocketCache =
- new ConcurrentHashMap<String, InetSocketAddress>();
+ private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
private SortedSet<String> allPeerNames = new TreeSet<String>();
private InetSocketAddress peerAddress;
@@ -158,7 +156,8 @@ public class BSPPeer implements Watcher,
targetPeerAddress = getAddress(peerName);
peerSocketCache.put(peerName, targetPeerAddress);
}
- ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+ ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+ .get(targetPeerAddress);
if (queue == null) {
queue = new ConcurrentLinkedQueue<BSPMessage>();
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java Tue Apr 5 09:30:25 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ByteMessage extends BSPMessage {
+
+ private byte[] tag;
+ private byte[] data;
+
+ public ByteMessage() {
+ super();
+ }
+
+ public ByteMessage(byte[] tag, byte[] data) {
+ super();
+ this.tag = new byte[tag.length];
+ this.data = new byte[data.length];
+ System.arraycopy(tag, 0, this.tag, 0, tag.length);
+ System.arraycopy(data, 0, this.data, 0, data.length);
+ }
+
+ @Override
+ public byte[] getTag() {
+ byte[] result = this.tag;
+ return result;
+ }
+
+ @Override
+ public byte[] getData() {
+ byte[] result = this.data;
+ return result;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.tag = new byte[in.readInt()];
+ in.readFully(tag, 0, this.tag.length);
+ this.data = new byte[in.readInt()];
+ in.readFully(data, 0, this.data.length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(tag.length);
+ out.write(tag);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java Tue Apr 5 09:30:25 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class DoubleMessage extends BSPMessage {
+
+ private String tag;
+ private double data;
+
+ public DoubleMessage() {
+ super();
+ }
+
+ public DoubleMessage(String tag, double data) {
+ super();
+ this.data = data;
+ this.tag = tag;
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
+
+ @Override
+ public Double getData() {
+ return data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag);
+ out.writeDouble(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ tag = in.readUTF();
+ data = in.readDouble();
+ }
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java Tue Apr 5 09:30:25 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+public interface Messagable {
+
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Apr 5 09:30:25 2011
@@ -95,7 +95,7 @@ public class TestBSPPeer extends HamaClu
peerNames.add("localhost:" + (30000 + i));
}
peer.setAllPeerNames(peerNames);
- TaskStatus currentTaskStatus = new TaskStatus(new BSPJobID(),
+ TaskStatus currentTaskStatus = new TaskStatus(new BSPJobID(),
new TaskAttemptID(), 0, null, null, null, null);
peer.setCurrentTaskStatus(currentTaskStatus);
BSPJob jobConf = new BSPJob(conf, NUM_PEER);
@@ -113,7 +113,7 @@ public class TestBSPPeer extends HamaClu
for (int j = 0; j < 10; j++) {
r.nextBytes(dummyData);
- msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
+ msg = new ByteMessage(Bytes.tail(dummyData, 128), dummyData);
String peerName = "localhost:" + (30000 + j);
try {
peer.send(peerName, msg);
@@ -144,8 +144,8 @@ public class TestBSPPeer extends HamaClu
private void verifyPayload(int round) {
int numMessages = peer.getNumCurrentMessages();
- assertEquals(round, ((int) peer.getSuperstepCount() -1 ));
-
+ assertEquals(round, ((int) peer.getSuperstepCount() - 1));
+
LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages
+ " messages at " + round + " round");
@@ -155,20 +155,21 @@ public class TestBSPPeer extends HamaClu
assertEquals(0, numMessages);
}
- BSPMessage msg = null;
+ ByteMessage msg = null;
int messageCounter = 0;
-
+
try {
- while ((msg = peer.getCurrentMessage()) != null) {
- assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data,
- msg.data.length - 128, 128), 0);
+ while ((msg = (ByteMessage) peer.getCurrentMessage()) != null) {
+ assertEquals(
+ Bytes.compareTo(msg.getTag(), 0, 128, msg.getData(),
+ msg.getData().length - 128, 128), 0);
++messageCounter;
}
} catch (IOException e) {
LOG.error(e);
}
assertEquals(numMessages, messageCounter);
-
+
peer.clearLocalQueue();
}
@@ -178,12 +179,12 @@ public class TestBSPPeer extends HamaClu
}
public void testSync() throws Throwable {
-
+
conf.setInt("bsp.peers.num", NUM_PEER);
conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
conf.set(Constants.PEER_HOST, "localhost");
conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
-
+
TestRunnable[] threads = new TestRunnable[NUM_PEER];
for (int i = 0; i < NUM_PEER; i++) {