You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/12/16 00:06:13 UTC
svn commit: r1214983 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/
Author: aching
Date: Thu Dec 15 23:06:12 2011
New Revision: 1214983
URL: http://svn.apache.org/viewvc?rev=1214983&view=rev
Log:
GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch
putMsgList RPCs together. (aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/CODE_CONVENTIONS
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Dec 15 23:06:12 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-57: Add new RPC call (putVertexIdMessagesList) to batch
+ putMsgList RPCs together. (aching)
+
GIRAPH-104: Save half of maximum memory used from messaging. (aching)
GIRAPH-10: Aggregators are not exported. (claudio)
Modified: incubator/giraph/trunk/CODE_CONVENTIONS
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CODE_CONVENTIONS?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/CODE_CONVENTIONS (original)
+++ incubator/giraph/trunk/CODE_CONVENTIONS Thu Dec 15 23:06:12 2011
@@ -49,9 +49,9 @@ if (LOG.isInfoEnabled()) {
}
- All classes, members, and member methods should have Javadoc in the following
- style. C-style comments for javadoc and // comments for non-javadoc. Also, the comment
- block should have a line break that separates the comment section and the @ section.
- See below.
+ style. C-style comments for javadoc and // comments for non-javadoc. Also,
+ the comment block should have a line break that separates the comment
+ section and the @ section. See below.
/**
* This is an example class
@@ -78,10 +78,18 @@ public class Giraffe {
}
}
+- When using synchronized statements, there should not be a space between
+ 'synchronized' and '('. For example:
+
+public foo() {
+ synchronized(bar) {
+ }
+}
+
- Class members should not begin with 'm_' or '_'
- No warnings allowed, but be as specific as possible with warning suppression
- Prefer to avoid abbreviations when reasonable (i.e. 'msg' vs 'message')
-- Static variable names should be entirely capitalized and seperated by '_'
+- Static variable names should be entirely capitalized and seperated by '_'
(i.e. private static int FOO_BAR_BAR = 2)
- Non-static variable and method names should not begin capitalized and should only use
alphanumeric characters (i.e. int fooBarBar)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java Thu Dec 15 23:06:12 2011
@@ -47,7 +47,7 @@ public abstract class ArrayListWritable<
*/
public ArrayListWritable() {
}
-
+
public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
super(arrayListWritable);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Thu Dec 15 23:06:12 2011
@@ -44,6 +44,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -92,6 +93,8 @@ public abstract class BasicRPCCommunicat
private final InetSocketAddress myAddress;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
+ /** Maximum messages sent per putVertexIdMessagesList RPC */
+ private final int maxMessagesPerFlushPut;
/**
* Map of the peer connections, mapping from remote socket address to client
* meta data
@@ -215,48 +218,68 @@ public abstract class BasicRPCCommunicat
= peerConnection.getRPCProxy();
long startMillis = System.currentTimeMillis();
long lastReportedMillis = startMillis;
-
try {
int verticesDone = 0;
- synchronized (peerConnection.outMessagesPerPeer) {
+ synchronized(peerConnection.outMessagesPerPeer) {
final int vertices =
peerConnection.outMessagesPerPeer.size();
+ // 1. Check for null messages and combine if possible
+ // 2. Send vertex ids and messages in bulk to the
+ // destination servers.
+ for (Entry<I, MsgList<M>> entry :
+ peerConnection.outMessagesPerPeer.entrySet()) {
+ for (M msg : entry.getValue()) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "run: Cannot put null message on " +
+ "vertex id " + entry.getKey());
+ }
+ }
+ if (combiner != null && entry.getValue().size() > 1) {
+ M combinedMsg = combiner.combine(entry.getKey(),
+ entry.getValue());
+ entry.getValue().clear();
+ entry.getValue().add(combinedMsg);
+ }
+ if (entry.getValue().isEmpty()) {
+ throw new IllegalStateException(
+ "run: Impossible for no messages in " +
+ entry.getKey());
+ }
+ }
while (!peerConnection.outMessagesPerPeer.isEmpty()) {
- Entry<I, MsgList<M>> e =
- peerConnection.outMessagesPerPeer.entrySet().iterator().next();
- MsgList<M> msgList = e.getValue();
-
- if (msgList.size() > 0) {
- if (msgList.size() > 1) {
- if (combiner != null) {
- M combinedMsg = combiner.combine(e.getKey(),
- msgList);
- if (combinedMsg != null) {
- proxy.putMsg(e.getKey(), combinedMsg);
- }
- } else {
- proxy.putMsgList(e.getKey(), msgList);
- }
- } else {
- for (M msg : msgList) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "putAllMessages: Cannot put " +
- "null message on " + e.getKey());
- }
- proxy.putMsg(e.getKey(), msg);
- context.progress();
- }
+ int bulkedMessages = 0;
+ Iterator<Entry<I, MsgList<M>>> vertexIdMessagesListIt =
+ peerConnection.outMessagesPerPeer.entrySet().
+ iterator();
+ VertexIdMessagesList<I, M> vertexIdMessagesList =
+ new VertexIdMessagesList<I, M>();
+ while (vertexIdMessagesListIt.hasNext()) {
+ Entry<I, MsgList<M>> entry =
+ vertexIdMessagesListIt.next();
+ // Add this entry if the list is empty or we
+ // haven't reached the maximum number of messages
+ if (vertexIdMessagesList.isEmpty() ||
+ ((bulkedMessages + entry.getValue().size())
+ < maxMessagesPerFlushPut)) {
+ vertexIdMessagesList.add(
+ new VertexIdMessages<I, M>(
+ entry.getKey(), entry.getValue()));
+ bulkedMessages += entry.getValue().size();
}
- msgList.clear();
}
- // Clean up the memory with the message list
- msgList = null;
- peerConnection.outMessagesPerPeer.remove(e.getKey());
- e = null;
+ // Clean up references to the vertex id and messages
+ for (VertexIdMessages<I, M>vertexIdMessages :
+ vertexIdMessagesList) {
+ peerConnection.outMessagesPerPeer.remove(
+ vertexIdMessages.getVertexId());
+ }
+
+ proxy.putVertexIdMessagesList(vertexIdMessagesList);
+ context.progress();
- ++verticesDone;
+ verticesDone += vertexIdMessagesList.size();
long curMillis = System.currentTimeMillis();
if ((lastReportedMillis +
REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
@@ -304,16 +327,18 @@ public abstract class BasicRPCCommunicat
* exceeds <i>maxSize</i>.
*/
private class LargeMessageFlushExecutor implements Runnable {
- final I destVertex;
- final MsgList<M> outMessage;
- PeerConnection peerConnection;
+ private final I destVertex;
+ private final MsgList<M> outMessageList;
+ private PeerConnection peerConnection;
LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) {
this.peerConnection = peerConnection;
- synchronized (peerConnection.outMessagesPerPeer) {
+ synchronized(peerConnection.outMessagesPerPeer) {
this.destVertex = destVertex;
- outMessage = peerConnection.outMessagesPerPeer.get(destVertex);
- peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
+ outMessageList =
+ peerConnection.outMessagesPerPeer.get(destVertex);
+ peerConnection.outMessagesPerPeer.put(destVertex,
+ new MsgList<M>());
}
}
@@ -325,21 +350,21 @@ public abstract class BasicRPCCommunicat
if (combiner != null) {
M combinedMsg = combiner.combine(destVertex,
- outMessage);
+ outMessageList);
if (combinedMsg != null) {
proxy.putMsg(destVertex, combinedMsg);
}
} else {
- proxy.putMsgList(destVertex, outMessage);
+ proxy.putMsgList(destVertex, outMessageList);
}
} catch (IOException e) {
LOG.error(e);
if (peerConnection.isProxy) {
RPC.stopProxy(peerConnection.peer);
}
- throw new RuntimeException(e);
+ throw new RuntimeException("run: Got IOException", e);
} finally {
- outMessage.clear();
+ outMessageList.clear();
}
}
}
@@ -371,6 +396,9 @@ public abstract class BasicRPCCommunicat
this.conf = context.getConfiguration();
this.maxSize = conf.getInt(GiraphJob.MSG_SIZE,
GiraphJob.MSG_SIZE_DEFAULT);
+ this.maxMessagesPerFlushPut =
+ conf.getInt(GiraphJob.MAX_MESSAGES_PER_FLUSH_PUT,
+ GiraphJob.DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT);
if (BspUtils.getVertexCombinerClass(conf) == null) {
this.combiner = null;
} else {
@@ -492,7 +520,7 @@ public abstract class BasicRPCCommunicat
Map<I, MsgList<M>> outMsgMap = null;
boolean isProxy = true;
CommunicationsInterface<I, V, E, M> peer = this;
- synchronized (outMessages) {
+ synchronized(outMessages) {
outMsgMap = outMessages.get(addrUnresolved);
if (LOG.isDebugEnabled()) {
LOG.debug("startPeerConnectionThread: Connecting to " +
@@ -570,7 +598,7 @@ end[HADOOP_FACEBOOK]*/
transientInMessages.put(vertex, msgs);
}
}
- synchronized (msgs) {
+ synchronized(msgs) {
msgs.add(msg);
}
}
@@ -591,12 +619,39 @@ end[HADOOP_FACEBOOK]*/
transientInMessages.put(vertex, msgs);
}
}
- synchronized (msgs) {
+ synchronized(msgs) {
msgs.addAll(msgList);
}
}
@Override
+ public final void putVertexIdMessagesList(
+ VertexIdMessagesList<I, M> vertexIdMessagesList)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putVertexIdMessagesList: Adding msgList " +
+ vertexIdMessagesList);
+ }
+
+ List<M> messageList = null;
+ for (VertexIdMessages<I, M> vertexIdMessages : vertexIdMessagesList) {
+ synchronized(transientInMessages) {
+ messageList =
+ transientInMessages.get(vertexIdMessages.getVertexId());
+ if (messageList == null) {
+ messageList = new ArrayList<M>(
+ vertexIdMessages.getMessageList().size());
+ transientInMessages.put(
+ vertexIdMessages.getVertexId(), messageList);
+ }
+ }
+ synchronized(messageList) {
+ messageList.addAll(vertexIdMessages.getMessageList());
+ }
+ }
+ }
+
+ @Override
public final void putVertexList(int partitionId,
VertexList<I, V, E, M> vertexList)
throws IOException {
@@ -604,7 +659,7 @@ end[HADOOP_FACEBOOK]*/
LOG.debug("putVertexList: On partition id " + partitionId +
" adding vertex list of size " + vertexList.size());
}
- synchronized (inPartitionVertexMap) {
+ synchronized(inPartitionVertexMap) {
if (vertexList.size() == 0) {
return;
}
@@ -624,7 +679,7 @@ end[HADOOP_FACEBOOK]*/
if (LOG.isDebugEnabled()) {
LOG.debug("addEdge: Adding edge " + edge);
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
VertexMutations<I, V, E, M> vertexMutations = null;
if (!inVertexMutationsMap.containsKey(vertexIndex)) {
vertexMutations = new VertexMutations<I, V, E, M>();
@@ -642,7 +697,7 @@ end[HADOOP_FACEBOOK]*/
LOG.debug("removeEdge: Removing edge on destination " +
destinationVertexIndex);
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
VertexMutations<I, V, E, M> vertexMutations = null;
if (!inVertexMutationsMap.containsKey(vertexIndex)) {
vertexMutations = new VertexMutations<I, V, E, M>();
@@ -659,7 +714,7 @@ end[HADOOP_FACEBOOK]*/
if (LOG.isDebugEnabled()) {
LOG.debug("addVertex: Adding vertex " + vertex);
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
VertexMutations<I, V, E, M> vertexMutations = null;
if (!inVertexMutationsMap.containsKey(vertex.getVertexId())) {
vertexMutations = new VertexMutations<I, V, E, M>();
@@ -676,7 +731,7 @@ end[HADOOP_FACEBOOK]*/
if (LOG.isDebugEnabled()) {
LOG.debug("removeVertex: Removing vertex " + vertexIndex);
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
VertexMutations<I, V, E, M> vertexMutations = null;
if (!inVertexMutationsMap.containsKey(vertexIndex)) {
vertexMutations = new VertexMutations<I, V, E, M>();
@@ -779,7 +834,7 @@ end[HADOOP_FACEBOOK]*/
}
++totalMsgsSentInSuperstep;
Map<I, MsgList<M>> msgMap = null;
- synchronized (outMessages) {
+ synchronized(outMessages) {
msgMap = outMessages.get(addr);
}
if (msgMap == null) { // should never happen after constructor
@@ -995,7 +1050,7 @@ end[HADOOP_FACEBOOK]*/
resolveVertexIndexSet.add(entry.getKey());
}
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
for (I vertexIndex : inVertexMutationsMap.keySet()) {
resolveVertexIndexSet.add(vertexIndex);
}
@@ -1042,7 +1097,7 @@ end[HADOOP_FACEBOOK]*/
partition.removeVertex(originalVertex.getVertexId());
}
}
- synchronized (inVertexMutationsMap) {
+ synchronized(inVertexMutationsMap) {
inVertexMutationsMap.clear();
}
}
@@ -1051,7 +1106,7 @@ end[HADOOP_FACEBOOK]*/
public void fixPartitionIdToSocketAddrMap() {
// 1. Fix all the cached inet addresses (remove all changed entries)
// 2. Connect to any new RPC servers
- synchronized (partitionIndexAddressMap) {
+ synchronized(partitionIndexAddressMap) {
for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
InetSocketAddress address =
partitionIndexAddressMap.get(
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Thu Dec 15 23:06:12 2011
@@ -77,6 +77,16 @@ public interface CommunicationsInterface
void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
/**
+ * Adds a list of vertex ids and their respective message lists.
+ *
+ * @param vertexIndex Vertex index where the message are added
+ * @param msgList messages added
+ * @throws IOException
+ */
+ void putVertexIdMessagesList(
+ VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
+
+ /**
* Adds vertex list (index, value, edges, etc.) to the appropriate worker.
*
* @param partitionId Partition id of the vertices to be added.
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java?rev=1214983&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java Thu Dec 15 23:06:12 2011
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This object is only used for transporting list of vertices and their
+ * respective messages to a destination RPC server.
+ *
+ * @param <I extends Writable> vertex id
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public class VertexIdMessages<I extends WritableComparable, M extends Writable>
+ implements Writable, Configurable {
+ /** Vertex id */
+ private I vertexId;
+ /** Message list corresponding to vertex id */
+ private MsgList<M> msgList;
+ /** Configuration from Configurable */
+ private Configuration conf;
+
+ /**
+ * Reflective constructor.
+ */
+ public VertexIdMessages() {}
+
+ /**
+ * Constructor used with creating initial values.
+ *
+ * @param vertexId Vertex id to be sent
+ * @param msgList Mesage list for the vertex id to be sent
+ */
+ public VertexIdMessages(I vertexId, MsgList<M> msgList) {
+ this.vertexId = vertexId;
+ this.msgList = msgList;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ vertexId = BspUtils.createVertexIndex(getConf());
+ vertexId.readFields(input);
+ msgList = new MsgList<M>();
+ msgList.setConf(getConf());
+ msgList.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ vertexId.write(output);
+ msgList.write(output);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public I getVertexId() {
+ return vertexId;
+ }
+
+ public MsgList<M> getMessageList() {
+ return msgList;
+ }
+ }
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java?rev=1214983&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Thu Dec 15 23:06:12 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Wrapper around {@link ArrayListWritable} that provides the list for
+ * {@link VertexIdMessage}.
+ *
+ * @param <I extends Writable> vertex id
+ * @param <M extends Writable> message data
+ */
+@SuppressWarnings("rawtypes")
+public class VertexIdMessagesList<I extends WritableComparable,
+ M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
+
+ public VertexIdMessagesList() {
+ super();
+ }
+
+ public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
+ super(vertexIdMessagesList);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<VertexIdMessages<I, M>>)
+ (new VertexIdMessages<I, M>()).getClass());
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1214983&r1=1214982&r2=1214983&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Dec 15 23:06:12 2011
@@ -61,10 +61,10 @@ public class GiraphJob extends Job {
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
/** Worker context class */
- public static final String WORKER_CONTEXT_CLASS =
+ public static final String WORKER_CONTEXT_CLASS =
"giraph.workerContextClass";
/** AggregatorWriter class - optional */
- public static final String AGGREGATOR_WRITER_CLASS =
+ public static final String AGGREGATOR_WRITER_CLASS =
"giraph.aggregatorWriterClass";
/**
@@ -170,8 +170,15 @@ public class GiraphJob extends Job {
/** Default maximum number of messages per peer before flush */
public static final int MSG_SIZE_DEFAULT = 1000;
+ /** Maximum number of messages that can be bulk sent during a flush */
+ public static final String MAX_MESSAGES_PER_FLUSH_PUT =
+ "giraph.maxMessagesPerFlushPut";
+ /** Default number of messages that can be bulk sent during a flush */
+ public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
+
/** Number of flush threads per peer */
- public static final String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
+ public static final String MSG_NUM_FLUSH_THREADS =
+ "giraph.msgNumFlushThreads";
/** Number of poll attempts prior to failing the job (int) */
public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
@@ -451,7 +458,7 @@ public class GiraphJob extends Job {
workerContextClass,
WorkerContext.class);
}
-
+
/**
* Set the aggregator writer class (optional)
*
@@ -464,7 +471,7 @@ public class GiraphJob extends Job {
aggregatorWriterClass,
AggregatorWriter.class);
}
-
+
/**
* Set worker configuration for determining what is required for
* a superstep.