You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/21 15:29:38 UTC

svn commit: r1095698 - /incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java

Author: edwardyoon
Date: Thu Apr 21 13:29:38 2011
New Revision: 1095698

URL: http://svn.apache.org/viewvc?rev=1095698&view=rev
Log:
adding cache constructor for messagebundle

Modified:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1095698&r1=1095697&r2=1095698&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Apr 21 13:29:38 2011
@@ -21,8 +21,13 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * BSPMessageBundle stores a group of BSPMessages so that they can be sent in
@@ -31,7 +36,8 @@ import org.apache.hadoop.io.Writable;
  */
 public class BSPMessageBundle implements Writable {
 
-  private List<BSPMessage> messages = new ArrayList<BSPMessage>();
+  private HashMap<String, LinkedList<BSPMessage>> messages = new HashMap<String, LinkedList<BSPMessage>>();
+  private HashMap<String, Class<? extends BSPMessage>> classCache = new HashMap<String, Class<? extends BSPMessage>>();
 
   public BSPMessageBundle() {
   }
@@ -42,58 +48,72 @@ public class BSPMessageBundle implements
    * @param message BSPMessage to add.
    */
   public void addMessage(BSPMessage message) {
-    messages.add(message);
+    String className = message.getClass().getName();
+    if (!messages.containsKey(className)) {
+      // use linked list because we're just iterating over them
+      LinkedList<BSPMessage> list = new LinkedList<BSPMessage>();
+      list.add(message);
+      messages.put(className, list);
+    } else {
+      messages.get(className).add(message);
+    }
   }
 
   public List<BSPMessage> getMessages() {
-    return messages;
-  }
-
-  public void setMessages(List<BSPMessage> messages) {
-    this.messages = messages;
+    // here we use an arraylist, because we know the size and outside may need
+    // random access
+    List<BSPMessage> mergeList = new ArrayList<BSPMessage>(messages.size());
+    for (LinkedList<BSPMessage> c : messages.values()) {
+      mergeList.addAll(c);
+    }
+    return mergeList;
   }
 
   public void write(DataOutput out) throws IOException {
+    // writes the k/v mapping size
     out.writeInt(messages.size());
     if (messages.size() > 0) {
-      // Write out message class.
-      BSPMessage element = messages.get(0);
-      Class<? extends BSPMessage> clazz = element.getClass();
-      out.writeUTF(clazz.getName());
-      // Serialize contents of this bundle.
-      for (BSPMessage message : messages) {
-        message.write(out);
+      for (Entry<String, LinkedList<BSPMessage>> entry : messages.entrySet()) {
+        out.writeUTF(entry.getKey());
+        LinkedList<BSPMessage> messageList = entry.getValue();
+        out.writeInt(messageList.size());
+        for (BSPMessage msg : messageList) {
+          msg.write(out);
+        }
       }
     }
   }
 
+  @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
+    if (messages == null) {
+      messages = new HashMap<String, LinkedList<BSPMessage>>();
+    }
     int numMessages = in.readInt();
     if (numMessages > 0) {
-      // Get classname of messages.
-      String className = in.readUTF();
-      Class<? extends BSPMessage> clazz = null;
-      try {
-        clazz = (Class<? extends BSPMessage>) Class.forName(className);
-      } catch (ClassNotFoundException ex) {
-        throw new IOException(ex);
-      }
-      // Deserialize messages.
-      messages = new ArrayList<BSPMessage>(numMessages);
-      for (int i = 0; i < numMessages; ++i) {
-        try {
-          // Instantiate new message and deserialize it.
-          BSPMessage newMessage = clazz.newInstance();
-          newMessage.readFields(in);
-          messages.add(newMessage);
-        } catch (InstantiationException ex) {
-          throw new IOException(ex);
-        } catch (IllegalAccessException ex) {
-          throw new IOException(ex);
+      for (int entries = 0; entries < numMessages; entries++) {
+        String className = in.readUTF();
+        int size = in.readInt();
+        LinkedList<BSPMessage> msgList = new LinkedList<BSPMessage>();
+        messages.put(className, msgList);
+
+        Class<? extends BSPMessage> clazz = null;
+        if ((clazz = classCache.get(className)) == null) {
+          try {
+            clazz = (Class<? extends BSPMessage>) Class.forName(className);
+            classCache.put(className, clazz);
+          } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+          }
+        }
+
+        for (int i = 0; i < size; i++) {
+          BSPMessage msg = ReflectionUtils.newInstance(clazz, null);
+          msg.readFields(in);
+          msgList.add(msg);
         }
+
       }
-    } else {
-      messages = new ArrayList<BSPMessage>();
     }
   }