You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/21 15:29:38 UTC
svn commit: r1095698 -
/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
Author: edwardyoon
Date: Thu Apr 21 13:29:38 2011
New Revision: 1095698
URL: http://svn.apache.org/viewvc?rev=1095698&view=rev
Log:
adding cache constructor for messagebundle
Modified:
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1095698&r1=1095697&r2=1095698&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Apr 21 13:29:38 2011
@@ -21,8 +21,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map.Entry;
+
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* BSPMessageBundle stores a group of BSPMessages so that they can be sent in
@@ -31,7 +36,8 @@ import org.apache.hadoop.io.Writable;
*/
public class BSPMessageBundle implements Writable {
- private List<BSPMessage> messages = new ArrayList<BSPMessage>();
+ private HashMap<String, LinkedList<BSPMessage>> messages = new HashMap<String, LinkedList<BSPMessage>>();
+ private HashMap<String, Class<? extends BSPMessage>> classCache = new HashMap<String, Class<? extends BSPMessage>>();
public BSPMessageBundle() {
}
@@ -42,58 +48,72 @@ public class BSPMessageBundle implements
* @param message BSPMessage to add.
*/
public void addMessage(BSPMessage message) {
- messages.add(message);
+ String className = message.getClass().getName();
+ if (!messages.containsKey(className)) {
+ // use linked list because we're just iterating over them
+ LinkedList<BSPMessage> list = new LinkedList<BSPMessage>();
+ list.add(message);
+ messages.put(className, list);
+ } else {
+ messages.get(className).add(message);
+ }
}
public List<BSPMessage> getMessages() {
- return messages;
- }
-
- public void setMessages(List<BSPMessage> messages) {
- this.messages = messages;
+ // here we use an arraylist, because we know the size and outside may need
+ // random access
+ List<BSPMessage> mergeList = new ArrayList<BSPMessage>(messages.size());
+ for (LinkedList<BSPMessage> c : messages.values()) {
+ mergeList.addAll(c);
+ }
+ return mergeList;
}
public void write(DataOutput out) throws IOException {
+ // writes the k/v mapping size
out.writeInt(messages.size());
if (messages.size() > 0) {
- // Write out message class.
- BSPMessage element = messages.get(0);
- Class<? extends BSPMessage> clazz = element.getClass();
- out.writeUTF(clazz.getName());
- // Serialize contents of this bundle.
- for (BSPMessage message : messages) {
- message.write(out);
+ for (Entry<String, LinkedList<BSPMessage>> entry : messages.entrySet()) {
+ out.writeUTF(entry.getKey());
+ LinkedList<BSPMessage> messageList = entry.getValue();
+ out.writeInt(messageList.size());
+ for (BSPMessage msg : messageList) {
+ msg.write(out);
+ }
}
}
}
+ @SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
+ if (messages == null) {
+ messages = new HashMap<String, LinkedList<BSPMessage>>();
+ }
int numMessages = in.readInt();
if (numMessages > 0) {
- // Get classname of messages.
- String className = in.readUTF();
- Class<? extends BSPMessage> clazz = null;
- try {
- clazz = (Class<? extends BSPMessage>) Class.forName(className);
- } catch (ClassNotFoundException ex) {
- throw new IOException(ex);
- }
- // Deserialize messages.
- messages = new ArrayList<BSPMessage>(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- try {
- // Instantiate new message and deserialize it.
- BSPMessage newMessage = clazz.newInstance();
- newMessage.readFields(in);
- messages.add(newMessage);
- } catch (InstantiationException ex) {
- throw new IOException(ex);
- } catch (IllegalAccessException ex) {
- throw new IOException(ex);
+ for (int entries = 0; entries < numMessages; entries++) {
+ String className = in.readUTF();
+ int size = in.readInt();
+ LinkedList<BSPMessage> msgList = new LinkedList<BSPMessage>();
+ messages.put(className, msgList);
+
+ Class<? extends BSPMessage> clazz = null;
+ if ((clazz = classCache.get(className)) == null) {
+ try {
+ clazz = (Class<? extends BSPMessage>) Class.forName(className);
+ classCache.put(className, clazz);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ for (int i = 0; i < size; i++) {
+ BSPMessage msg = ReflectionUtils.newInstance(clazz, null);
+ msg.readFields(in);
+ msgList.add(msg);
}
+
}
- } else {
- messages = new ArrayList<BSPMessage>();
}
}