You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/09/16 22:56:44 UTC
svn commit: r1171776 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: aching
Date: Fri Sep 16 20:56:44 2011
New Revision: 1171776
URL: http://svn.apache.org/viewvc?rev=1171776&view=rev
Log:
GIRAPH-34: Failure of Vertex reflection for putVertexList from
GIRAPH-27. (aching)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Fri Sep 16 20:56:44 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-34: Failure of Vertex reflection for putVertexList from
+ GIRAPH-27. (aching)
+
GIRAPH-35: Modifying the site to indicate that Jake Mannix and
Dmitriy Ryaboy are now Giraph committers. (aching)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Fri Sep 16 20:56:44 2011
@@ -37,13 +37,13 @@ import java.util.TreeSet;
import org.apache.log4j.Logger;
import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.MutableVertex;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexCombiner;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexRange;
import org.apache.giraph.graph.VertexResolver;
@@ -423,6 +423,15 @@ public abstract class BasicRPCCommunicat
InetSocketAddress addr,
int numHandlers, String jobId, J jobToken) throws IOException;
+ /**
+ * Only constructor.
+ *
+ * @param context Context for getting configuration
+ * @param service Service worker to get the vertex ranges
+ * @throws IOException
+ * @throws UnknownHostException
+ * @throws InterruptedException
+ */
public BasicRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
CentralizedServiceWorker<I, V, E, M> service)
throws IOException, UnknownHostException, InterruptedException {
@@ -804,8 +813,8 @@ end[HADOOP_FACEBOOK]*/
public final void sendMessageReq(I destVertex, M msg) {
InetSocketAddress addr = getInetSocketAddress(destVertex);
if (LOG.isDebugEnabled()) {
- LOG.debug("sendMessage: Send bytes (" + msg.toString() + ") to " +
- destVertex + " with address " + addr);
+ LOG.debug("sendMessage: Send bytes (" + msg.toString() +
+ ") to " + destVertex + " with address " + addr);
}
++totalMsgsSentInSuperstep;
Map<I, MsgList<M>> msgMap = null;
@@ -825,8 +834,10 @@ end[HADOOP_FACEBOOK]*/
msgMap.put(destVertex, msgList);
}
msgList.add(msg);
- LOG.debug("sendMessage: added msg=" + msg + ", size=" +
- msgList.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sendMessage: added msg=" + msg + ", size=" +
+ msgList.size());
+ }
if (msgList.size() > maxSize) {
peerThreads.get(addr).flushLargeMsgList(destVertex);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Fri Sep 16 20:56:44 2011
@@ -124,12 +124,13 @@ public class SimpleCheckpointVertex exte
System.out.println("compute: vertex " + getVertexId() +
" sending edgeValue " + edgeValue +
" vertexValue " + vertexValue +
- " total " + (edgeValue.get() + (float) vertexValue) +
+ " total " + (edgeValue.get() +
+ (float) vertexValue) +
" to vertex " + targetVertexId +
" on superstep " + getSuperstep());
edgeValue.set(edgeValue.get() + (float) vertexValue);
addEdge(targetVertexId, edgeValue);
- sendMsg(targetVertexId, edgeValue);
+ sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Fri Sep 16 20:56:44 2011
@@ -18,6 +18,8 @@
package org.apache.giraph.graph;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -37,9 +39,11 @@ import java.util.List;
@SuppressWarnings("rawtypes")
public abstract class BasicVertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements AggregatorUsage, Iterable<I> {
+ implements AggregatorUsage, Iterable<I>, Configurable {
/** Global graph state **/
private GraphState<I,V,E,M> graphState;
+ /** Configuration */
+ private Configuration conf;
/**
* Optionally defined by the user to be executed once on all workers
@@ -160,10 +164,12 @@ public abstract class BasicVertex<I exte
public abstract int getNumOutEdges();
/**
- * Send a message to a vertex id.
+ * Send a message to a vertex id. The message should not be mutated after
+ * this method returns or else undefined results could occur.
*
- * @param id vertex id to send the message to
- * @param msg message data to send
+ * @param id Vertex id to send the message to
+ * @param msg Message data to send. Note that after the message is sent,
+ * the user should not modify the object.
*/
public void sendMsg(I id, M msg) {
if (msg == null) {
@@ -182,8 +188,8 @@ public abstract class BasicVertex<I exte
/**
* After this is called, the compute() code will no longer be called for
* this vertice unless a message is sent to it. Then the compute() code
- * will be called once again until this function is called. The application
- * finishes only when all vertices vote to halt.
+ * will be called once again until this function is called. The
+ * application finishes only when all vertices vote to halt.
*/
public abstract void voteToHalt();
@@ -245,4 +251,14 @@ public abstract class BasicVertex<I exte
return getGraphState().getGraphMapper().getAggregatorUsage().
useAggregator(name);
}
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Fri Sep 16 20:56:44 2011
@@ -145,7 +145,7 @@ public abstract class Vertex<I extends W
@Override
public E removeEdge(I targetVertexId) {
Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
- if(edge != null) {
+ if (edge != null) {
return edge.getEdgeValue();
} else {
return null;
@@ -175,26 +175,23 @@ public abstract class Vertex<I extends W
@Override
final public void readFields(DataInput in) throws IOException {
- vertexId =
- BspUtils.<I>createVertexIndex(getContext().getConfiguration());
+ vertexId = BspUtils.<I>createVertexIndex(getConf());
vertexId.readFields(in);
boolean hasVertexValue = in.readBoolean();
if (hasVertexValue) {
- vertexValue =
- BspUtils.<V>createVertexValue(getContext().getConfiguration());
+ vertexValue = BspUtils.<V>createVertexValue(getConf());
vertexValue.readFields(in);
}
long edgeMapSize = in.readLong();
for (long i = 0; i < edgeMapSize; ++i) {
Edge<I, E> edge = new Edge<I, E>();
- edge.setConf(getContext().getConfiguration());
+ edge.setConf(getConf());
edge.readFields(in);
addEdge(edge.getDestVertexId(), edge.getEdgeValue());
}
long msgListSize = in.readLong();
for (long i = 0; i < msgListSize; ++i) {
- M msg =
- BspUtils.<M>createMessageValue(getContext().getConfiguration());
+ M msg = BspUtils.<M>createMessageValue(getConf());
msg.readFields(in);
msgList.add(msg);
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java?rev=1171776&r1=1171775&r2=1171776&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestVertexRangeBalancer.java Fri Sep 16 20:56:44 2011
@@ -71,7 +71,7 @@ public class TestVertexRangeBalancer ext
removeAndSetOutput(job, outputPath);
assertTrue(job.run(true));
FileSystem hdfs = FileSystem.get(job.getConfiguration());
- final int correctLen = 118;
+ final int correctLen = 123;
if (getJobTracker() != null) {
FileStatus [] fileStatusArr = hdfs.listStatus(outputPath);
int totalLen = 0;