You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/12/16 00:06:13 UTC

svn commit: r1214983 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/

Author: aching
Date: Thu Dec 15 23:06:12 2011
New Revision: 1214983

URL: http://svn.apache.org/viewvc?rev=1214983&view=rev
Log:
GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch
putMsgList RPCs together. (aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/CODE_CONVENTIONS
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Dec 15 23:06:12 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch
+  putMsgList RPCs together. (aching)
+
   GIRAPH-104: Save half of maximum memory used from messaging. (aching)
 
   GIRAPH-10: Aggregators are not exported. (claudio)

Modified: incubator/giraph/trunk/CODE_CONVENTIONS
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CODE_CONVENTIONS?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/CODE_CONVENTIONS (original)
+++ incubator/giraph/trunk/CODE_CONVENTIONS Thu Dec 15 23:06:12 2011
@@ -49,9 +49,9 @@ if (LOG.isInfoEnabled()) {
 }
 
 - All classes, members, and member methods should have Javadoc in the following
-  style.  C-style comments for javadoc and // comments for non-javadoc.  Also, the comment 
-  block should have a line break that separates the comment section and the @ section.  
-  See below.
+  style.  C-style comments for javadoc and // comments for non-javadoc.  Also,
+  the comment block should have a line break that separates the comment
+  section and the @ section.  See below.
 
 /**
  * This is an example class
@@ -78,10 +78,18 @@ public class Giraffe {
   }
 }
 
+- When using synchronized statements, there should not be a space between
+  'synchronized' and '('.  For example:
+
+public foo() {
+  synchronized(bar) {
+  }
+}
+
 - Class members should not begin with 'm_' or '_'
 - No warnings allowed, but be as specific as possible with warning suppression
 - Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
-- Static variable names should be entirely capitalized and seperated by '_' 
+- Static variable names should be entirely capitalized and seperated by '_'
   (i.e. private static int FOO_BAR_BAR = 2)
 - Non-static variable and method names should not begin capitalized and should only use
   alphanumeric characters (i.e. int fooBarBar)

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java Thu Dec 15 23:06:12 2011
@@ -47,7 +47,7 @@ public abstract class ArrayListWritable<
      */
     public ArrayListWritable() {
     }
-    
+
     public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
         super(arrayListWritable);
     }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Dec 15 23:06:12 2011
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -92,6 +93,8 @@ public abstract class BasicRPCCommunicat
     private final InetSocketAddress myAddress;
     /** Messages sent during the last superstep */
     private long totalMsgsSentInSuperstep = 0;
+    /** Maximum messages sent per putVertexIdMessagesList RPC */
+    private final int maxMessagesPerFlushPut;
     /**
      * Map of the peer connections, mapping from remote socket address to client
      * meta data
@@ -215,48 +218,68 @@ public abstract class BasicRPCCommunicat
                 = peerConnection.getRPCProxy();
             long startMillis = System.currentTimeMillis();
             long lastReportedMillis = startMillis;
-
             try {
                 int verticesDone = 0;
-                synchronized (peerConnection.outMessagesPerPeer) {
+                synchronized(peerConnection.outMessagesPerPeer) {
                     final int vertices =
                         peerConnection.outMessagesPerPeer.size();
+                    // 1. Check for null messages and combine if possible
+                    // 2. Send vertex ids and messages in bulk to the
+                    //    destination servers.
+                    for (Entry<I, MsgList<M>> entry :
+                            peerConnection.outMessagesPerPeer.entrySet()) {
+                        for (M msg : entry.getValue()) {
+                            if (msg == null) {
+                                throw new IllegalArgumentException(
+                                    "run: Cannot put null message on " +
+                                    "vertex id " + entry.getKey());
+                            }
+                        }
+                        if (combiner != null && entry.getValue().size() > 1) {
+                            M combinedMsg = combiner.combine(entry.getKey(),
+                                                             entry.getValue());
+                            entry.getValue().clear();
+                            entry.getValue().add(combinedMsg);
+                        }
+                        if (entry.getValue().isEmpty()) {
+                            throw new IllegalStateException(
+                                "run: Impossible for no messages in " +
+                                entry.getKey());
+                        }
+                    }
                     while (!peerConnection.outMessagesPerPeer.isEmpty()) {
-                        Entry<I, MsgList<M>> e =
-                            peerConnection.outMessagesPerPeer.entrySet().iterator().next();
-                        MsgList<M> msgList = e.getValue();
-
-                        if (msgList.size() > 0) {
-                            if (msgList.size() > 1) {
-                                if (combiner != null) {
-                                    M combinedMsg = combiner.combine(e.getKey(),
-                                        msgList);
-                                    if (combinedMsg != null) {
-                                        proxy.putMsg(e.getKey(), combinedMsg);
-                                    }
-                                } else {
-                                    proxy.putMsgList(e.getKey(), msgList);
-                                }
-                            } else {
-                                for (M msg : msgList) {
-                                    if (msg == null) {
-                                        throw new IllegalArgumentException(
-                                            "putAllMessages: Cannot put " +
-                                                "null message on " + e.getKey());
-                                    }
-                                    proxy.putMsg(e.getKey(), msg);
-                                    context.progress();
-                                }
+                        int bulkedMessages = 0;
+                        Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt =
+                            peerConnection.outMessagesPerPeer.entrySet().
+                            iterator();
+                        VertexIdMessagesList<I, M> vertexIdMessagesList =
+                            new VertexIdMessagesList<I, M>();
+                        while (vertexIdMessagesListIt.hasNext()) {
+                            Entry<I, MsgList<M>> entry =
+                                vertexIdMessagesListIt.next();
+                            // Add this entry if the list is empty or we
+                            // haven't reached the maximum number of messages
+                            if (vertexIdMessagesList.isEmpty() ||
+                                    ((bulkedMessages + entry.getValue().size())
+                                     < maxMessagesPerFlushPut)) {
+                                vertexIdMessagesList.add(
+                                    new VertexIdMessages<I, M>(
+                                        entry.getKey(), entry.getValue()));
+                                bulkedMessages += entry.getValue().size();
                             }
-                            msgList.clear();
                         }
 
-                        // Clean up the memory with the message list
-                        msgList = null;
-                        peerConnection.outMessagesPerPeer.remove(e.getKey());
-                        e = null;
+                        // Clean up references to the vertex id and messages
+                        for (VertexIdMessages<I, M>vertexIdMessages :
+                                vertexIdMessagesList) {
+                            peerConnection.outMessagesPerPeer.remove(
+                                vertexIdMessages.getVertexId());
+                        }
+
+                        proxy.putVertexIdMessagesList(vertexIdMessagesList);
+                        context.progress();
 
-                        ++verticesDone;
+                        verticesDone += vertexIdMessagesList.size();
                         long curMillis = System.currentTimeMillis();
                         if ((lastReportedMillis +
                                 REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
@@ -304,16 +327,18 @@ public abstract class BasicRPCCommunicat
      * exceeds <i>maxSize</i>.
      */
     private class LargeMessageFlushExecutor implements Runnable {
-        final I destVertex;
-        final MsgList<M> outMessage;
-        PeerConnection peerConnection;
+        private final I destVertex;
+        private final MsgList<M> outMessageList;
+        private PeerConnection peerConnection;
 
         LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
             this.peerConnection = peerConnection;
-            synchronized (peerConnection.outMessagesPerPeer) {
+            synchronized(peerConnection.outMessagesPerPeer) {
                 this.destVertex = destVertex;
-                outMessage = peerConnection.outMessagesPerPeer.get(destVertex);
-                peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
+                outMessageList =
+                    peerConnection.outMessagesPerPeer.get(destVertex);
+                peerConnection.outMessagesPerPeer.put(destVertex,
+                                                      new MsgList<M>());
             }
         }
 
@@ -325,21 +350,21 @@ public abstract class BasicRPCCommunicat
 
                 if (combiner != null) {
                     M combinedMsg = combiner.combine(destVertex,
-                                                     outMessage);
+                                                     outMessageList);
                     if (combinedMsg != null) {
                         proxy.putMsg(destVertex, combinedMsg);
                     }
                 } else {
-                    proxy.putMsgList(destVertex, outMessage);
+                    proxy.putMsgList(destVertex, outMessageList);
                 }
             } catch (IOException e) {
                 LOG.error(e);
                 if (peerConnection.isProxy) {
                     RPC.stopProxy(peerConnection.peer);
                 }
-                throw new RuntimeException(e);
+                throw new RuntimeException("run: Got IOException", e);
             } finally {
-                outMessage.clear();
+                outMessageList.clear();
             }
         }
     }
@@ -371,6 +396,9 @@ public abstract class BasicRPCCommunicat
         this.conf = context.getConfiguration();
         this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
                                    GiraphJob.MSG_SIZE_DEFAULT);
+        this.maxMessagesPerFlushPut =
+            conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
+                        GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
         if (BspUtils.getVertexCombinerClass(conf) == null) {
             this.combiner = null;
         } else {
@@ -492,7 +520,7 @@ public abstract class BasicRPCCommunicat
         Map<I, MsgList<M>> outMsgMap = null;
         boolean isProxy = true;
         CommunicationsInterface<I, V, E, M> peer = this;
-        synchronized (outMessages) {
+        synchronized(outMessages) {
             outMsgMap = outMessages.get(addrUnresolved);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("startPeerConnectionThread: Connecting to " +
@@ -570,7 +598,7 @@ end[HADOOP_FACEBOOK]*/
                     transientInMessages.put(vertex, msgs);
                 }
             }
-            synchronized (msgs) {
+            synchronized(msgs) {
                 msgs.add(msg);
             }
         }
@@ -591,12 +619,39 @@ end[HADOOP_FACEBOOK]*/
                 transientInMessages.put(vertex, msgs);
             }
         }
-        synchronized (msgs) {
+        synchronized(msgs) {
             msgs.addAll(msgList);
         }
     }
 
     @Override
+    public final void putVertexIdMessagesList(
+            VertexIdMessagesList<I, M> vertexIdMessagesList)
+            throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("putVertexIdMessagesList: Adding msgList " +
+                      vertexIdMessagesList);
+        }
+
+        List<M> messageList = null;
+        for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) {
+            synchronized(transientInMessages) {
+                messageList =
+                    transientInMessages.get(vertexIdMessages.getVertexId());
+                if (messageList == null) {
+                    messageList = new ArrayList<M>(
+                        vertexIdMessages.getMessageList().size());
+                    transientInMessages.put(
+                        vertexIdMessages.getVertexId(), messageList);
+                }
+            }
+            synchronized(messageList) {
+                messageList.addAll(vertexIdMessages.getMessageList());
+            }
+        }
+    }
+
+    @Override
     public final void putVertexList(int partitionId,
                                     VertexList<I, V, E, M> vertexList)
             throws IOException {
@@ -604,7 +659,7 @@ end[HADOOP_FACEBOOK]*/
             LOG.debug("putVertexList: On partition id " + partitionId +
                       " adding vertex list of size " + vertexList.size());
         }
-        synchronized (inPartitionVertexMap) {
+        synchronized(inPartitionVertexMap) {
             if (vertexList.size() == 0) {
                 return;
             }
@@ -624,7 +679,7 @@ end[HADOOP_FACEBOOK]*/
         if (LOG.isDebugEnabled()) {
             LOG.debug("addEdge: Adding edge " + edge);
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             VertexMutations<I, V, E, M> vertexMutations = null;
             if (!inVertexMutationsMap.containsKey(vertexIndex)) {
                 vertexMutations = new VertexMutations<I, V, E, M>();
@@ -642,7 +697,7 @@ end[HADOOP_FACEBOOK]*/
             LOG.debug("removeEdge: Removing edge on destination " +
                       destinationVertexIndex);
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             VertexMutations<I, V, E, M> vertexMutations = null;
             if (!inVertexMutationsMap.containsKey(vertexIndex)) {
                 vertexMutations = new VertexMutations<I, V, E, M>();
@@ -659,7 +714,7 @@ end[HADOOP_FACEBOOK]*/
         if (LOG.isDebugEnabled()) {
             LOG.debug("addVertex: Adding vertex " + vertex);
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             VertexMutations<I, V, E, M> vertexMutations = null;
             if (!inVertexMutationsMap.containsKey(vertex.getVertexId())) {
                 vertexMutations = new VertexMutations<I, V, E, M>();
@@ -676,7 +731,7 @@ end[HADOOP_FACEBOOK]*/
         if (LOG.isDebugEnabled()) {
             LOG.debug("removeVertex: Removing vertex " + vertexIndex);
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             VertexMutations<I, V, E, M> vertexMutations = null;
             if (!inVertexMutationsMap.containsKey(vertexIndex)) {
                 vertexMutations = new VertexMutations<I, V, E, M>();
@@ -779,7 +834,7 @@ end[HADOOP_FACEBOOK]*/
         }
         ++totalMsgsSentInSuperstep;
         Map<I, MsgList<M>> msgMap = null;
-        synchronized (outMessages) {
+        synchronized(outMessages) {
             msgMap = outMessages.get(addr);
         }
         if (msgMap == null) { // should never happen after constructor
@@ -995,7 +1050,7 @@ end[HADOOP_FACEBOOK]*/
                 resolveVertexIndexSet.add(entry.getKey());
             }
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             for (I vertexIndex : inVertexMutationsMap.keySet()) {
                 resolveVertexIndexSet.add(vertexIndex);
             }
@@ -1042,7 +1097,7 @@ end[HADOOP_FACEBOOK]*/
                 partition.removeVertex(originalVertex.getVertexId());
             }
         }
-        synchronized (inVertexMutationsMap) {
+        synchronized(inVertexMutationsMap) {
             inVertexMutationsMap.clear();
         }
     }
@@ -1051,7 +1106,7 @@ end[HADOOP_FACEBOOK]*/
     public void fixPartitionIdToSocketAddrMap() {
         // 1. Fix all the cached inet addresses (remove all changed entries)
         // 2. Connect to any new RPC servers
-        synchronized (partitionIndexAddressMap) {
+        synchronized(partitionIndexAddressMap) {
             for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
                 InetSocketAddress address =
                     partitionIndexAddressMap.get(

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Thu Dec 15 23:06:12 2011
@@ -77,6 +77,16 @@ public interface CommunicationsInterface
     void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
 
     /**
+     * Adds a list of vertex ids and their respective message lists.
+     *
+     * @param vertexIndex Vertex index where the message are added
+     * @param msgList messages added
+     * @throws IOException
+     */
+    void putVertexIdMessagesList(
+        VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
+
+    /**
      * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
      *
      * @param partitionId Partition id of the vertices to be added.

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java?rev=1214983&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java Thu Dec 15 23:06:12 2011
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This object is only used for transporting list of vertices and their
+ * respective messages to a destination RPC server.
+ *
+ * @param <I extends Writable> vertex id
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public class VertexIdMessages<I extends WritableComparable, M extends Writable>
+        implements Writable, Configurable {
+    /** Vertex id */
+    private I vertexId;
+    /** Message list corresponding to vertex id */
+    private MsgList<M> msgList;
+    /** Configuration from Configurable */
+    private Configuration conf;
+
+    /**
+     * Reflective constructor.
+     */
+    public VertexIdMessages() {}
+
+    /**
+     * Constructor used with creating initial values.
+     *
+     * @param vertexId Vertex id to be sent
+     * @param msgList Mesage list for the vertex id to be sent
+     */
+    public VertexIdMessages(I vertexId, MsgList<M> msgList) {
+        this.vertexId = vertexId;
+        this.msgList = msgList;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        vertexId = BspUtils.createVertexIndex(getConf());
+        vertexId.readFields(input);
+        msgList = new MsgList<M>();
+        msgList.setConf(getConf());
+        msgList.readFields(input);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        vertexId.write(output);
+        msgList.write(output);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    public I getVertexId() {
+        return vertexId;
+    }
+
+    public MsgList<M> getMessageList() {
+        return msgList;
+    }
+ }

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java?rev=1214983&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Thu Dec 15 23:06:12 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that provides the list for
+ * {@link VertexIdMessage}.
+ *
+ * @param <I extends Writable> vertex id
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public class VertexIdMessagesList<I extends WritableComparable,
+        M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
+    /** Defining a layout version for a serializable class. */
+    private static final long serialVersionUID = 100L;
+
+    public VertexIdMessagesList() {
+        super();
+    }
+
+    public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
+        super(vertexIdMessagesList);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setClass() {
+        setClass((Class<VertexIdMessages<I, M>>)
+                 (new VertexIdMessages<I, M>()).getClass());
+    }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Dec 15 23:06:12 2011
@@ -61,10 +61,10 @@ public class GiraphJob extends Job {
     /** Message value class */
     public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
     /** Worker context class */
-    public static final String WORKER_CONTEXT_CLASS = 
+    public static final String WORKER_CONTEXT_CLASS =
     	"giraph.workerContextClass";
     /** AggregatorWriter class - optional */
-    public static final String AGGREGATOR_WRITER_CLASS = 
+    public static final String AGGREGATOR_WRITER_CLASS =
     	"giraph.aggregatorWriterClass";
 
     /**
@@ -170,8 +170,15 @@ public class GiraphJob extends Job {
     /** Default maximum number of messages per peer before flush */
     public static final int MSG_SIZE_DEFAULT = 1000;
 
+    /** Maximum number of messages that can be bulk sent during a flush */
+    public static final String MAX_MESSAGES_PER_FLUSH_PUT =
+        "giraph.maxMessagesPerFlushPut";
+    /** Default number of messages that can be bulk sent during a flush */
+    public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
+
     /** Number of flush threads per peer */
-    public static final String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
+    public static final String MSG_NUM_FLUSH_THREADS =
+        "giraph.msgNumFlushThreads";
 
     /** Number of poll attempts prior to failing the job (int) */
     public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
@@ -451,7 +458,7 @@ public class GiraphJob extends Job {
                                     workerContextClass,
                                     WorkerContext.class);
     }
-    
+
     /**
      * Set the aggregator writer class (optional)
      *
@@ -464,7 +471,7 @@ public class GiraphJob extends Job {
                                      aggregatorWriterClass,
                                      AggregatorWriter.class);
      }
-    
+
     /**
      * Set worker configuration for determining what is required for
      * a superstep.