You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/05 11:30:25 UTC

svn commit: r1088939 - in /incubator/hama: branches/0.2.1/ trunk/ trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/ trunk/src/examples/org/apache/hama/examples/ trunk/src/java/org/apache/hama/bsp/ trunk/src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Apr  5 09:30:25 2011
New Revision: 1088939

URL: http://svn.apache.org/viewvc?rev=1088939&view=rev
Log:
Re-design a new data structure of BSPMessage

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java
Modified:
    incubator/hama/branches/0.2.1/CHANGES.txt
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java

Modified: incubator/hama/branches/0.2.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/branches/0.2.1/CHANGES.txt?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/branches/0.2.1/CHANGES.txt (original)
+++ incubator/hama/branches/0.2.1/CHANGES.txt Tue Apr  5 09:30:25 2011
@@ -12,6 +12,8 @@ Release 0.2.1 - Unreleased
     HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content 
                        in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)
 
+Release 0.2 - Unreleased
+
   NEW FEATURES
 
     HAMA-339: Add job status command (edwardyoon)

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr  5 09:30:25 2011
@@ -1,6 +1,28 @@
 Hama Change Log
 
-Trunk (unreleased changes)
+Release 0.3 - Unreleased
+
+  NEW FEATURES
+
+  BUG FIXES
+
+  IMPROVEMENTS
+
+    HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)   
+
+Release 0.2.1 - Unreleased
+
+  NEW FEATURES
+
+  BUG FIXES
+
+  IMPROVEMENTS
+  
+    HAMA-369: Reduce overhead of local communications (Miklos Erdelyi via edwardyoon)
+    HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content 
+                       in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)
+
+Release 0.2 - Unreleased
 
   NEW FEATURES
 
@@ -52,8 +74,6 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
-    HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content 
-                       in conf files or not when building artifacts (ChiaHung Lin)
     HAMA-369: Reduce overhead of local communications (Miklos Erdelyi)
     HAMA-366: Remove unnecessary dependencies (Tommaso Teofili)
     HAMA-365: TestBSPPeer doesn't check if the correct 

Modified: incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml (original)
+++ incubator/hama/trunk/src/docs/src/documentation/content/xdocs/docs/r0.2.0/index.xml Tue Apr  5 09:30:25 2011
@@ -28,7 +28,7 @@
     <section>
     <title>Downloadable User Guide</title>
     <ul>
-    	<li>Apache Hama (v0.2) : User Guide [<a href="/docs/r0.2.0/ApacheHama-0.2_UserGuide.pdf">pdf</a>]</li>
+    	<li>Apache Hama (v0.2) : User Guide [<a href="/hama/docs/r0.2.0/ApacheHama-0.2_UserGuide.pdf">pdf</a>]</li>
     </ul>
     </section>
     

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Tue Apr  5 09:30:25 2011
@@ -31,10 +31,9 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
-import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeerProtocol;
 import org.apache.hama.bsp.ClusterStatus;
-import org.apache.hama.util.Bytes;
+import org.apache.hama.bsp.DoubleMessage;
 import org.apache.zookeeper.KeeperException;
 
 public class PiEstimator {
@@ -59,9 +58,8 @@ public class PiEstimator {
         }
       }
 
-      byte[] tagName = Bytes.toBytes(bspPeer.getPeerName());
-      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
-      BSPMessage estimate = new BSPMessage(tagName, myData);
+      double data = 4.0 * (double) in / (double) iterations;
+      DoubleMessage estimate = new DoubleMessage(bspPeer.getPeerName(), data);
 
       bspPeer.send(masterTask, estimate);
       bspPeer.sync();
@@ -69,9 +67,9 @@ public class PiEstimator {
       if (bspPeer.getPeerName().equals(masterTask)) {
         double pi = 0.0;
         int numPeers = bspPeer.getNumCurrentMessages();
-        BSPMessage received;
-        while ((received = bspPeer.getCurrentMessage()) != null) {
-          pi += Bytes.toDouble(received.getData());
+        DoubleMessage received;
+        while ((received = (DoubleMessage) bspPeer.getCurrentMessage()) != null) {
+          pi += received.getData();
         }
 
         pi = pi / numPeers;

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandBench.java Tue Apr  5 09:30:25 2011
@@ -29,6 +29,7 @@ import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ByteMessage;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.util.Bytes;
 import org.apache.zookeeper.KeeperException;
@@ -59,17 +60,18 @@ public class RandBench {
         for (int j = 0; j < nCommunications; j++) {
           String tPeer = peers[r.nextInt(peers.length)];
           String tag = peerName + " to " + tPeer;
-          msg = new BSPMessage(Bytes.toBytes(tag), dummyData);
+          msg = new ByteMessage(Bytes.toBytes(tag), dummyData);
           bspPeer.send(tPeer, msg);
         }
 
         bspPeer.sync();
 
-        BSPMessage received;
-        while ((received = bspPeer.getCurrentMessage()) != null) {
-          LOG.info(Bytes.toString(received.getTag()) + " : " + received.getData().length);
+        ByteMessage received;
+        while ((received = (ByteMessage) bspPeer.getCurrentMessage()) != null) {
+          LOG.info(Bytes.toString(received.getTag()) + " : "
+              + received.getData().length);
         }
-        
+
       }
     }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Tue Apr  5 09:30:25 2011
@@ -17,35 +17,13 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 
 /**
  * BSPMessage consists of the tag and the arbitrary amount of data to be
  * communicated.
  */
-public class BSPMessage implements Writable {
-  protected byte[] tag;
-  protected byte[] data;
-
-  public BSPMessage() {
-  }
-
-  /**
-   * Constructor
-   * 
-   * @param tag of data
-   * @param data of message
-   */
-  public BSPMessage(byte[] tag, byte[] data) {
-    this.tag = new byte[tag.length];
-    this.data = new byte[data.length];
-    System.arraycopy(tag, 0, this.tag, 0, tag.length);
-    System.arraycopy(data, 0, this.data, 0, data.length);
-  }
+public abstract class BSPMessage implements Messagable, Writable {
 
   /**
    * BSP messages are typically identified with tags. This allows to get the tag
@@ -53,32 +31,11 @@ public class BSPMessage implements Writa
    * 
    * @return tag of data of BSP message
    */
-  public byte[] getTag() {
-    byte[] result = this.tag;
-    return result;
-  }
+  public abstract Object getTag();
 
   /**
    * @return data of BSP message
    */
-  public byte[] getData() {
-    byte[] result = this.data;
-    return result;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    this.tag = new byte[in.readInt()];
-    in.readFully(tag, 0, this.tag.length);
-    this.data = new byte[in.readInt()];
-    in.readFully(data, 0, this.data.length);
-  }
+  public abstract Object getData();
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(tag.length);
-    out.write(tag);
-    out.writeInt(data.length);
-    out.write(data);
-  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Apr  5 09:30:25 2011
@@ -44,7 +44,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
 /**
- * This class represents a BSP peer. 
+ * This class represents a BSP peer.
  */
 public class BSPPeer implements Watcher, BSPPeerInterface {
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
@@ -62,10 +62,8 @@ public class BSPPeer implements Watcher,
   private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
   private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration =
-     new ConcurrentLinkedQueue<BSPMessage>();
-  private final Map<String, InetSocketAddress> peerSocketCache =
-     new ConcurrentHashMap<String, InetSocketAddress>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
 
   private SortedSet<String> allPeerNames = new TreeSet<String>();
   private InetSocketAddress peerAddress;
@@ -158,7 +156,8 @@ public class BSPPeer implements Watcher,
         targetPeerAddress = getAddress(peerName);
         peerSocketCache.put(peerName, targetPeerAddress);
       }
-      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
+          .get(targetPeerAddress);
       if (queue == null) {
         queue = new ConcurrentLinkedQueue<BSPMessage>();
       }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ByteMessage.java Tue Apr  5 09:30:25 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ByteMessage extends BSPMessage {
+
+  private byte[] tag;
+  private byte[] data;
+
+  public ByteMessage() {
+    super();
+  }
+
+  public ByteMessage(byte[] tag, byte[] data) {
+    super();
+    this.tag = new byte[tag.length];
+    this.data = new byte[data.length];
+    System.arraycopy(tag, 0, this.tag, 0, tag.length);
+    System.arraycopy(data, 0, this.data, 0, data.length);
+  }
+
+  @Override
+  public byte[] getTag() {
+    byte[] result = this.tag;
+    return result;
+  }
+
+  @Override
+  public byte[] getData() {
+    byte[] result = this.data;
+    return result;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.tag = new byte[in.readInt()];
+    in.readFully(tag, 0, this.tag.length);
+    this.data = new byte[in.readInt()];
+    in.readFully(data, 0, this.data.length);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag.length);
+    out.write(tag);
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DoubleMessage.java Tue Apr  5 09:30:25 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class DoubleMessage extends BSPMessage {
+
+  private String tag;
+  private double data;
+
+  public DoubleMessage() {
+    super();
+  }
+
+  public DoubleMessage(String tag, double data) {
+    super();
+    this.data = data;
+    this.tag = tag;
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Double getData() {
+    return data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeDouble(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readDouble();
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java?rev=1088939&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Messagable.java Tue Apr  5 09:30:25 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+public interface Messagable {
+
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1088939&r1=1088938&r2=1088939&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Tue Apr  5 09:30:25 2011
@@ -95,7 +95,7 @@ public class TestBSPPeer extends HamaClu
         peerNames.add("localhost:" + (30000 + i));
       }
       peer.setAllPeerNames(peerNames);
-      TaskStatus currentTaskStatus = new TaskStatus(new BSPJobID(), 
+      TaskStatus currentTaskStatus = new TaskStatus(new BSPJobID(),
           new TaskAttemptID(), 0, null, null, null, null);
       peer.setCurrentTaskStatus(currentTaskStatus);
       BSPJob jobConf = new BSPJob(conf, NUM_PEER);
@@ -113,7 +113,7 @@ public class TestBSPPeer extends HamaClu
 
         for (int j = 0; j < 10; j++) {
           r.nextBytes(dummyData);
-          msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
+          msg = new ByteMessage(Bytes.tail(dummyData, 128), dummyData);
           String peerName = "localhost:" + (30000 + j);
           try {
             peer.send(peerName, msg);
@@ -144,8 +144,8 @@ public class TestBSPPeer extends HamaClu
 
     private void verifyPayload(int round) {
       int numMessages = peer.getNumCurrentMessages();
-      assertEquals(round, ((int) peer.getSuperstepCount() -1 ));
-      
+      assertEquals(round, ((int) peer.getSuperstepCount() - 1));
+
       LOG.info("[" + peer.getPeerName() + "] verifying " + numMessages
           + " messages at " + round + " round");
 
@@ -155,20 +155,21 @@ public class TestBSPPeer extends HamaClu
         assertEquals(0, numMessages);
       }
 
-      BSPMessage msg = null;
+      ByteMessage msg = null;
       int messageCounter = 0;
-      
+
       try {
-        while ((msg = peer.getCurrentMessage()) != null) {
-          assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data,
-              msg.data.length - 128, 128), 0);
+        while ((msg = (ByteMessage) peer.getCurrentMessage()) != null) {
+          assertEquals(
+              Bytes.compareTo(msg.getTag(), 0, 128, msg.getData(),
+                  msg.getData().length - 128, 128), 0);
           ++messageCounter;
         }
       } catch (IOException e) {
         LOG.error(e);
       }
       assertEquals(numMessages, messageCounter);
-      
+
       peer.clearLocalQueue();
     }
 
@@ -178,12 +179,12 @@ public class TestBSPPeer extends HamaClu
   }
 
   public void testSync() throws Throwable {
-    
+
     conf.setInt("bsp.peers.num", NUM_PEER);
     conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     conf.set(Constants.PEER_HOST, "localhost");
     conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
-    
+
     TestRunnable[] threads = new TestRunnable[NUM_PEER];
 
     for (int i = 0; i < NUM_PEER; i++) {