You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/09/10 02:40:22 UTC
svn commit: r1167420 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: aching
Date: Sat Sep 10 00:40:21 2011
New Revision: 1167420
URL: http://svn.apache.org/viewvc?rev=1167420&view=rev
Log:
GIRAPH-27: Mutable static global state in Vertex.java should be
refactored. jake.mannix via aching.
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Sat Sep 10 00:40:21 2011
@@ -1,28 +1,31 @@
Giraph Change Log
Release 0.70.0 - unreleased
+
+ GIRAPH-27: Mutable static global state in Vertex.java should be
+ refactored. jake.mannix via aching.
- GIRAPH-25 NPE in BspServiceMaster when failing a job (aching on behalf
- of dvryaboy)
+ GIRAPH-25: NPE in BspServiceMaster when failing a job. dvryaboy via
+ aching.
- GIRAPH-24 Job-level statistics reports one superstep greater than
+ GIRAPH-24: Job-level statistics reports one superstep greater than
workers. (jghoman)
- GIRAPH-18 Refactor BspServiceWorker::loadVertices(). (jghoman)
+ GIRAPH-18: Refactor BspServiceWorker::loadVertices(). (jghoman)
- GIRAPH-14 Support for the Facebook Hadoop branch. (aching)
+ GIRAPH-14: Support for the Facebook Hadoop branch. (aching)
- GIRAPH-16 Add Apache RAT to the verify build step. (omalley)
+ GIRAPH-16: Add Apache RAT to the verify build step. (omalley)
- GIRAPH-17 Giraph doesn't give up properly after the maximum connect
+ GIRAPH-17: Giraph doesn't give up properly after the maximum connect
attempts to ZooKeeper. (aching)
- GIRAPH-2 Make the project homepage. (jghoman)
+ GIRAPH-2: Make the project homepage. (jghoman)
- GIRAPH-9 Change Yahoo License Header to Apache License Header (hyunsik)
+ GIRAPH-9: Change Yahoo License Header to Apache License Header (hyunsik)
- GIRAPH-6 Remove Yahoo-specific code from pom.xml. (jghoman)
+ GIRAPH-6: Remove Yahoo-specific code from pom.xml. (jghoman)
- GIRAPH-5 Remove Yahoo directories after svn import from Yahoo! (aching)
+ GIRAPH-5: Remove Yahoo directories after svn import from Yahoo! (aching)
- GIRAPH-3 Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file
+ GIRAPH-3: Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Sat Sep 10 00:40:21 2011
@@ -20,7 +20,7 @@ package org.apache.giraph.bsp;
import java.io.IOException;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +44,7 @@ public interface CentralizedService<I ex
*
* @return representation vertex
*/
- BasicVertex<I, V, E, M> getRepresentativeVertex();
+ Vertex<I, V, E, M> getRepresentativeVertex();
/**
* Get the current global superstep of the application to work on.
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Sep 10 00:40:21 2011
@@ -1017,10 +1017,11 @@ end[HADOOP_FACEBOOK]*/
// Resolve all graph mutations
for (I vertexIndex : resolveVertexIndexSet) {
VertexResolver<I, V, E, M> vertexResolver =
- BspUtils.createVertexResolver(conf);
+ BspUtils.createVertexResolver(
+ conf, service.getGraphMapper().getGraphState());
VertexRange<I, V, E, M> vertexRange =
service.getVertexRange(service.getSuperstep() - 1, vertexIndex);
- BasicVertex<I, V, E, M> originalVertex =
+ Vertex<I, V, E, M> originalVertex =
vertexRange.getVertexMap().get(vertexIndex);
List<M> msgList = inMessages.get(vertexIndex);
if (originalVertex != null) {
@@ -1043,7 +1044,8 @@ end[HADOOP_FACEBOOK]*/
if (vertex != null) {
((MutableVertex<I, V, E, M>) vertex).setVertexId(vertexIndex);
- vertexRange.getVertexMap().put(vertex.getVertexId(), vertex);
+ vertexRange.getVertexMap().put(vertex.getVertexId(),
+ (Vertex<I, V, E, M>) vertex);
} else if (originalVertex != null) {
vertexRange.getVertexMap().remove(originalVertex.getVertexId());
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Sat Sep 10 00:40:21 2011
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,14 +18,15 @@
package org.apache.giraph.graph;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
/**
* Basic interface for writing a BSP application for computation.
*
@@ -35,11 +36,12 @@ import org.apache.hadoop.io.WritableComp
* @param <M> message data
*/
@SuppressWarnings("rawtypes")
-public interface BasicVertex<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends AggregatorUsage {
+public abstract class BasicVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements AggregatorUsage {
+ /** Global graph state **/
+ private GraphState<I,V,E,M> graphState;
+
/**
* Optionally defined by the user to be executed once on all workers
* before application has started.
@@ -47,26 +49,26 @@ public interface BasicVertex<I extends W
* @throws IllegalAccessException
* @throws InstantiationException
*/
- void preApplication()
+ public abstract void preApplication()
throws InstantiationException, IllegalAccessException;
/**
* Optionally defined by the user to be executed once on all workers
* after the application has completed.
*/
- void postApplication();
+ public abstract void postApplication();
/**
* Optionally defined by the user to be executed once prior to vertex
* processing on a worker for the current superstep.
*/
- void preSuperstep();
+ public abstract void preSuperstep();
/**
* Optionally defined by the user to be executed once after all vertex
* processing on a worker for the current superstep.
*/
- void postSuperstep();
+ public abstract void postSuperstep();
/**
* Must be defined by user to do computation on a single Vertex.
@@ -75,33 +77,35 @@ public interface BasicVertex<I extends W
* vertex in the previous superstep
* @throws IOException
*/
- void compute(Iterator<M> msgIterator) throws IOException;
+ public abstract void compute(Iterator<M> msgIterator) throws IOException;
/**
* Retrieves the current superstep.
*
* @return Current superstep
*/
- long getSuperstep();
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
/**
* Get the vertex id
*/
- I getVertexId();
+ public abstract I getVertexId();
/**
* Get the vertex value (data stored with vertex)
*
* @return Vertex value
*/
- V getVertexValue();
+ public abstract V getVertexValue();
/**
* Set the vertex data (immediately visible in the computation)
*
* @param vertexValue Vertex data to be set
*/
- void setVertexValue(V vertexValue);
+ public abstract void setVertexValue(V vertexValue);
/**
* Get the total (all workers) number of vertices that
@@ -109,7 +113,9 @@ public interface BasicVertex<I extends W
*
* @return Total number of vertices (-1 if first superstep)
*/
- long getNumVertices();
+ public long getNumVertices() {
+ return getGraphState().getNumVertices();
+ }
/**
* Get the total (all workers) number of edges that
@@ -117,7 +123,9 @@ public interface BasicVertex<I extends W
*
* @return Total number of edges (-1 if first superstep)
*/
- long getNumEdges();
+ public long getNumEdges() {
+ return getGraphState().getNumEdges();
+ }
/**
* Every vertex has edges to other vertices. Get a handle to the outward
@@ -125,7 +133,7 @@ public interface BasicVertex<I extends W
*
* @return Map of the destination vertex index to the {@link Edge}
*/
- SortedMap<I, Edge<I, E>> getOutEdgeMap();
+ public abstract SortedMap<I, Edge<I, E>> getOutEdgeMap();
/**
* Send a message to a vertex id.
@@ -133,12 +141,19 @@ public interface BasicVertex<I extends W
* @param id vertex id to send the message to
* @param msg message data to send
*/
- void sendMsg(I id, M msg);
+ public void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsg: Cannot send null message to " + id);
+ }
+ getGraphState().getGraphMapper().getWorkerCommunications().
+ sendMessageReq(id, msg);
+ }
/**
* Send a message to all edges.
*/
- void sendMsgToAllEdges(M msg);
+ public abstract void sendMsgToAllEdges(M msg);
/**
* After this is called, the compute() code will no longer be called for
@@ -146,16 +161,64 @@ public interface BasicVertex<I extends W
* will be called once again until this function is called. The application
* finishes only when all vertices vote to halt.
*/
- void voteToHalt();
+ public abstract void voteToHalt();
/**
* Is this vertex done?
*/
- boolean isHalted();
+ public abstract boolean isHalted();
/**
* Get the list of incoming messages from the previous superstep. Same as
* the message iterator passed to compute().
*/
- List<M> getMsgList();
+ public abstract List<M> getMsgList();
+
+ /**
+ * Get the graph state for all workers.
+ *
+ * @return Graph state for all workers
+ */
+ GraphState<I, V, E, M> getGraphState() {
+ return graphState;
+ }
+
+ /**
+ * Set the graph state for all workers
+ *
+ * @param graphState Graph state for all workers
+ */
+ void setGraphState(GraphState<I, V, E, M> graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return getGraphState().getContext();
+ }
+
+ @Override
+ public final <A extends Writable> Aggregator<A> registerAggregator(
+ String name,
+ Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ registerAggregator(name, aggregatorClass);
+ }
+
+ @Override
+ public final Aggregator<? extends Writable> getAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ getAggregator(name);
+ }
+
+ @Override
+ public final boolean useAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ useAggregator(name);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Sat Sep 10 00:40:21 2011
@@ -18,41 +18,39 @@
package org.apache.giraph.graph;
-import java.io.IOException;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.log4j.Logger;
+import org.apache.giraph.bsp.CentralizedService;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import org.apache.giraph.bsp.CentralizedService;
-import org.apache.giraph.zk.BspEvent;
-import org.apache.giraph.zk.PredicateLock;
-import org.apache.giraph.zk.ZooKeeperExt;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
/**
* Zookeeper-based implementation of {@link CentralizedService}.
@@ -631,7 +629,9 @@ public abstract class BspService <
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
this.representativeVertex =
- BspUtils.<I, V, E, M>createVertex(getConfiguration());
+ BspUtils.<I, V, E, M>createVertex(
+ getConfiguration(),
+ getGraphMapper().getGraphState());
this.checkpointFrequency =
conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
@@ -670,7 +670,12 @@ public abstract class BspService <
return jobId;
}
- final public BasicVertex<I, V, E, M> getRepresentativeVertex() {
+ /**
+ * Get the representative vertex
+ *
+ * @return Representative vertex for this service.
+ */
+ final public Vertex<I, V, E, M> getRepresentativeVertex() {
return representativeVertex;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Sep 10 00:40:21 2011
@@ -18,6 +18,28 @@
package org.apache.giraph.graph;
+import net.iharder.Base64;
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -36,33 +58,6 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
-import net.iharder.Base64;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import org.apache.log4j.Logger;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.giraph.bsp.ApplicationState;
-
/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
*/
@@ -387,16 +382,17 @@ public class BspServiceWorker<
* @throws InterruptedException
*/
private List<Vertex<I, V, E, M>> readVerticesFromInputSplit(
- InputSplit inputSplit) throws IOException, InterruptedException {
- List<Vertex<I, V, E, M>> vertexList = new ArrayList<Vertex<I, V, E, M>>();
+ InputSplit inputSplit) throws IOException, InterruptedException {
+ List<Vertex<I, V, E, M>> vertexList =
+ new ArrayList<Vertex<I, V, E, M>>();
VertexInputFormat<I, V, E> vertexInputFormat =
BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
VertexReader<I, V, E> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, getContext());
vertexReader.initialize(inputSplit, getContext());
Vertex<I, V, E, M> readerVertex =
- BspUtils.<I, V, E, M>createVertex(getConfiguration());
-
+ BspUtils.<I, V, E, M>createVertex(
+ getConfiguration(), getGraphMapper().getGraphState());
while (vertexReader.next(readerVertex)) {
if (readerVertex.getVertexId() == null) {
throw new IllegalArgumentException(
@@ -422,7 +418,8 @@ public class BspServiceWorker<
}
}
vertexList.add(readerVertex);
- readerVertex = BspUtils.<I, V, E, M>createVertex(getConfiguration());
+ readerVertex = BspUtils.<I, V, E, M>createVertex(getConfiguration(),
+ getGraphMapper().getGraphState());
getContext().progress();
}
vertexReader.close();
@@ -510,8 +507,7 @@ public class BspServiceWorker<
}
VertexRange<I, V, E, M> range =
vertexRangeMap.get(currentVertexIndexMax);
- SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
- range.getVertexMap();
+ SortedMap<I, Vertex<I, V, E, M>> vertexMap = range.getVertexMap();
if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
throw new IllegalStateException(
"loadVertices: Already contains vertex " +
@@ -1179,7 +1175,9 @@ public class BspServiceWorker<
VertexRange<I, V, E, M> vertexRange = getVertexRangeMap().get(maxIndex);
for (int i = 0; i < vertexCount; ++i) {
Vertex<I, V, E, M> vertex =
- BspUtils.<I, V, E, M>createVertex(getConfiguration());
+ BspUtils.<I, V, E, M>createVertex(
+ getConfiguration(),
+ getGraphMapper().getGraphState());
vertex.readFields(dataStream);
// Add the vertex
if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex)
@@ -1364,7 +1362,7 @@ public class BspServiceWorker<
continue;
}
- SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
+ SortedMap<I, Vertex<I, V, E, M>> vertexMap =
getVertexRangeMap().get(entry.getKey()).getVertexMap();
if (vertexMap.size() != 0) {
throw new RuntimeException(
@@ -1378,7 +1376,7 @@ public class BspServiceWorker<
entry.getValue().size() +
" vertices for max index " + entry.getKey());
}
- for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+ for (Vertex<I, V, E, M> vertex : entry.getValue()) {
if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
throw new IllegalStateException(
"exchangeVertexRanges: Vertex " + vertex +
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Sat Sep 10 00:40:21 2011
@@ -152,7 +152,8 @@ public class BspUtils {
*/
@SuppressWarnings("rawtypes")
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> VertexRangeBalancer<I, V, E, M>
+ E extends Writable, M extends Writable>
+ VertexRangeBalancer<I, V, E, M>
createVertexRangeBalancer(Configuration conf) {
Class<? extends VertexRangeBalancer<I, V, E, M>>
vertexRangeBalancerClass = getVertexRangeBalancerClass(conf);
@@ -187,10 +188,14 @@ public class BspUtils {
@SuppressWarnings("rawtypes")
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> VertexResolver<I, V, E, M>
- createVertexResolver(Configuration conf) {
+ createVertexResolver(Configuration conf,
+ GraphState<I, V, E, M> graphState) {
Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass =
getVertexResolverClass(conf);
- return ReflectionUtils.newInstance(vertexResolverClass, conf);
+ VertexResolver<I, V, E, M> resolver =
+ ReflectionUtils.newInstance(vertexResolverClass, conf);
+ resolver.setGraphState(graphState);
+ return resolver;
}
/**
@@ -221,10 +226,14 @@ public class BspUtils {
@SuppressWarnings("rawtypes")
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable> Vertex<I, V, E, M>
- createVertex(Configuration conf) {
+ createVertex(Configuration conf,
+ GraphState<I, V, E, M> graphState) {
Class<? extends Vertex<I, V, E, M>> vertexClass =
getVertexClass(conf);
- return ReflectionUtils.newInstance(vertexClass, conf);
+ Vertex<I, V, E, M> vertex =
+ ReflectionUtils.newInstance(vertexClass, conf);
+ vertex.setGraphState(graphState);
+ return vertex;
}
/**
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sat Sep 10 00:40:21 2011
@@ -18,15 +18,6 @@
package org.apache.giraph.graph;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.RPCCommunications;
import org.apache.giraph.comm.ServerInterface;
@@ -39,6 +30,15 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
/**
* This mapper that will execute the BSP graph tasks. Since this mapper will
* not be passing data by key-value pairs through the MR framework, the
@@ -66,6 +66,11 @@ public class GraphMapper<I extends Writa
private boolean done = false;
/** What kind of functions is this mapper doing? */
private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
+ /**
+ * Graph state for all vertices that is used for the duration of
+ * this mapper.
+ */
+ private GraphState<I,V,E,M> graphState = new GraphState<I, V, E, M>();
/** What kinds of functions to run on this mapper */
public enum MapFunctions {
@@ -103,6 +108,10 @@ public class GraphMapper<I extends Writa
return serviceWorker;
}
+ public final GraphState<I,V,E,M> getGraphState() {
+ return graphState;
+ }
+
/**
* Default handler for uncaught exceptions.
*/
@@ -380,6 +389,7 @@ public class GraphMapper<I extends Writa
public void setup(Context context)
throws IOException, InterruptedException {
context.setStatus("setup: Beginning mapper setup.");
+ graphState.setContext(context);
// Setting the default handler for uncaught exceptions.
Thread.setDefaultUncaughtExceptionHandler(
new OverrideExceptionHandler());
@@ -391,8 +401,6 @@ public class GraphMapper<I extends Writa
}
// Ensure the user classes have matching types and figure them out
determineClassTypes(conf);
- Vertex.setGraphMapper(this);
- Vertex.setContext(context);
// Do some initial setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
@@ -506,7 +514,13 @@ public class GraphMapper<I extends Writa
}
mapAlreadyRun = true;
+ graphState.setSuperstep(serviceWorker.getSuperstep()).
+ setContext(context).setGraphMapper(this).
+ setNumEdges(serviceWorker.getTotalEdges()).
+ setNumVertices(serviceWorker.getTotalVertices());
+
try {
+ serviceWorker.getRepresentativeVertex().setGraphState(graphState);
serviceWorker.getRepresentativeVertex().preApplication();
} catch (InstantiationException e) {
LOG.fatal("map: preApplication failed in instantiation", e);
@@ -526,9 +540,14 @@ public class GraphMapper<I extends Writa
do {
long superstep = serviceWorker.getSuperstep();
+ graphState.setSuperstep(superstep)
+ .setNumEdges(serviceWorker.getTotalEdges())
+ .setNumVertices(serviceWorker.getTotalVertices());
+
if (commService != null) {
commService.prepareSuperstep();
}
+
serviceWorker.startSuperstep();
if (zkManager != null && zkManager.runsZooKeeper()) {
if (LOG.isInfoEnabled()) {
@@ -570,10 +589,6 @@ public class GraphMapper<I extends Writa
serviceWorker.exchangeVertexRanges();
context.progress();
- Vertex.setSuperstep(superstep);
- Vertex.setNumVertices(serviceWorker.getTotalVertices());
- Vertex.setNumEdges(serviceWorker.getTotalEdges());
-
serviceWorker.getRepresentativeVertex().preSuperstep();
context.progress();
@@ -591,8 +606,11 @@ public class GraphMapper<I extends Writa
continue;
}
- for (BasicVertex<I, V, E, M> vertex :
+ for (Vertex<I, V, E, M> vertex :
entry.getValue().getVertexMap().values()) {
+ // Make sure every vertex has the current
+ // graphState before computing
+ vertex.setGraphState(graphState);
if (vertex.isHalted() &&
!vertex.getMsgList().isEmpty()) {
Vertex<I, V, E, M> activatedVertex =
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1167420&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Sat Sep 10 00:40:21 2011
@@ -0,0 +1,75 @@
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/*
+ * Global state of the graph. Should be treated as a singleton (but is kept
+ * as a regular bean to facilitate ease of unit testing)
+ *
+ * @param <I> vertex id
+ * @param <V> vertex data
+ * @param <E> edge data
+ * @param <M> message data
+ */
+@SuppressWarnings("rawtypes")
+public class GraphState<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> {
+ /** Graph-wide superstep */
+ private long superstep = 0;
+ /** Graph-wide number of vertices */
+ private long numVertices = -1;
+ /** Graph-wide number of edges */
+ private long numEdges = -1;
+ /** Graph-wide map context */
+ private Mapper.Context context = null;
+ /** Graph-wide BSP Mapper for this Vertex */
+ private GraphMapper<I, V, E, M> graphMapper = null;
+
+ public long getSuperstep() {
+ return superstep;
+ }
+
+ public GraphState<I, V, E, M> setSuperstep(long superstep) {
+ this.superstep = superstep;
+ return this;
+ }
+
+ public long getNumVertices() {
+ return numVertices;
+ }
+
+ public GraphState<I, V, E, M> setNumVertices(long numVertices) {
+ this.numVertices = numVertices;
+ return this;
+ }
+
+ public long getNumEdges() {
+ return numEdges;
+ }
+
+ public GraphState<I, V, E, M> setNumEdges(long numEdges) {
+ this.numEdges = numEdges;
+ return this;
+ }
+
+ public Mapper.Context getContext() {
+ return context;
+ }
+
+ public GraphState<I, V , E ,M> setContext(Mapper.Context context) {
+ this.context = context;
+ return this;
+ }
+
+ public GraphMapper<I, V, E, M> getGraphMapper() {
+ return graphMapper;
+ }
+
+ public GraphState<I, V, E, M> setGraphMapper(
+ GraphMapper<I, V, E, M> graphMapper) {
+ this.graphMapper = graphMapper;
+ return this;
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Sat Sep 10 00:40:21 2011
@@ -28,17 +28,15 @@ import org.apache.hadoop.io.WritableComp
* or mutate the graph.
*/
@SuppressWarnings("rawtypes")
-public interface MutableVertex<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends BasicVertex<I, V, E, M>, Writable {
+public abstract class MutableVertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BasicVertex<I, V, E, M> implements Writable {
/**
* Set the vertex id
*
* @param id Vertex id is set to this (instantiated by the user)
*/
- void setVertexId(I id);
+ public abstract void setVertexId(I id);
/**
* Add an edge for this vertex (happens immediately)
@@ -46,7 +44,7 @@ public interface MutableVertex<I extends
* @param edge Edge to be added
* @return Return true if succeeded, false otherwise
*/
- boolean addEdge(Edge<I, E> edge);
+ public abstract boolean addEdge(Edge<I, E> edge);
/**
* Create a vertex for use in addVertexRequest(). Still need to get the
@@ -54,7 +52,12 @@ public interface MutableVertex<I extends
*
* @return Created vertex for addVertexRequest.
*/
- MutableVertex<I, V, E, M> instantiateVertex();
+ public MutableVertex<I, V, E, M> instantiateVertex() {
+ Vertex<I, V, E, M> mutableVertex =
+ BspUtils.createVertex(getContext().getConfiguration(),
+ getGraphState());
+ return mutableVertex;
+ }
/**
* Sends a request to create a vertex that will be available during the
@@ -62,7 +65,11 @@ public interface MutableVertex<I extends
*
* @param vertex User created vertex
*/
- void addVertexRequest(MutableVertex<I, V, E, M> vertex) throws IOException;
+ public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
+ throws IOException {
+ getGraphState().getGraphMapper().getWorkerCommunications().
+ addVertexReq(vertex);
+}
/**
* Request to remove a vertex from the graph
@@ -70,7 +77,10 @@ public interface MutableVertex<I extends
*
* @param vertexId Id of the vertex to be removed.
*/
- void removeVertexRequest(I vertexId) throws IOException;
+ public void removeVertexRequest(I vertexId) throws IOException {
+ getGraphState().getGraphMapper().getWorkerCommunications().
+ removeVertexReq(vertexId);
+ }
/**
* Request to add an edge of a vertex in the graph
@@ -79,7 +89,11 @@ public interface MutableVertex<I extends
* @param sourceVertexId Source vertex id of edge
* @param edge Edge to add
*/
- void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) throws IOException;
+ public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+ throws IOException {
+ getGraphState().getGraphMapper().getWorkerCommunications().
+ addEdgeReq(sourceVertexId, edge);
+ }
/**
* Request to remove an edge of a vertex from the graph
@@ -88,5 +102,9 @@ public interface MutableVertex<I extends
* @param sourceVertexId Source vertex id of edge
* @param destVertexId Destination vertex id of edge
*/
- void removeEdgeRequest(I sourceVertexId, I destVertexId) throws IOException;
+ public void removeEdgeRequest(I sourceVertexId, I destVertexId)
+ throws IOException {
+ getGraphState().getGraphMapper().getWorkerCommunications().
+ removeEdgeReq(sourceVertexId, destVertexId);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Sat Sep 10 00:40:21 2011
@@ -18,6 +18,10 @@
package org.apache.giraph.graph;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -26,13 +30,6 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.log4j.Logger;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-
/**
* User applications should all subclass {@link Vertex}. Package access
* should prevent users from accessing internal methods.
@@ -43,24 +40,11 @@ import org.apache.hadoop.mapreduce.Mappe
* @param <M> Message value
*/
@SuppressWarnings("rawtypes")
-public abstract class Vertex<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- implements MutableVertex<I, V, E, M> {
+public abstract class Vertex<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ extends MutableVertex<I, V, E, M> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(Vertex.class);
- /** Class-wide superstep */
- private static long superstep = 0;
- /** Class-wide number of vertices */
- private static long numVertices = -1;
- /** Class-wide number of edges */
- private static long numEdges = -1;
- /** Class-wide map context */
- private static Mapper.Context context = null;
- /** Class-wide BSP Mapper for this Vertex */
- private static GraphMapper<?, ? ,?, ?> graphMapper = null;
/** Vertex id */
private I vertexId = null;
/** Vertex value */
@@ -119,32 +103,6 @@ public abstract class Vertex<
return vertexId;
}
- /**
- * Set the GraphMapper for this vertex (internal use).
- *
- * @param graphMapper Mapper to use for communication
- */
- final static <I extends WritableComparable,
- V extends Writable, E extends Writable,
- M extends Writable> void
- setGraphMapper(GraphMapper<I, V, E, M> graphMapper) {
- Vertex.graphMapper = graphMapper;
- }
-
- /**
- * Set the global superstep for all the vertices (internal use)
- *
- * @param superstep New superstep
- */
- static void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
-
- @Override
- public final long getSuperstep() {
- return superstep;
- }
-
@Override
public final V getVertexValue() {
return vertexValue;
@@ -155,50 +113,11 @@ public abstract class Vertex<
this.vertexValue = vertexValue;
}
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices Aggregate vertices in the last superstep
- */
- static void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
-
- @Override
- public final long getNumVertices() {
- return numVertices;
- }
-
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges Aggregate edges in the last superstep
- */
- static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
-
- @Override
- public final long getNumEdges() {
- return numEdges;
- }
-
@Override
public final SortedMap<I, Edge<I, E>> getOutEdgeMap() {
return destEdgeMap;
}
- @SuppressWarnings("unchecked")
- @Override
- public final void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- ((GraphMapper<I, V, E, M>) graphMapper).
- getWorkerCommunications().sendMessageReq(id, msg);
- }
-
@Override
public final void sendMsgToAllEdges(M msg) {
if (msg == null) {
@@ -211,45 +130,6 @@ public abstract class Vertex<
}
@Override
- public MutableVertex<I, V, E, M> instantiateVertex() {
- Vertex<I, V, E, M> mutableVertex =
- BspUtils.<I, V, E, M>createVertex(getContext().getConfiguration());
- return mutableVertex;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void addVertexRequest(MutableVertex<I, V, E, M> vertex)
- throws IOException {
- ((GraphMapper<I, V, E, M>) graphMapper).
- getWorkerCommunications().addVertexReq(vertex);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void removeVertexRequest(I vertexId) throws IOException {
- ((GraphMapper<I, V, E, M>) graphMapper).
- getWorkerCommunications().removeVertexReq(vertexId);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void addEdgeRequest(I vertexIndex,
- Edge<I, E> edge) throws IOException {
- ((GraphMapper<I, V, E, M>) graphMapper).
- getWorkerCommunications().addEdgeReq(vertexIndex, edge);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void removeEdgeRequest(I sourceVertexId,
- I destVertexId) throws IOException {
- ((GraphMapper<I, V, E, M>) graphMapper).
- getWorkerCommunications().removeEdgeReq(sourceVertexId,
- destVertexId);
- }
-
- @Override
public final void voteToHalt() {
halt = true;
}
@@ -262,12 +142,12 @@ public abstract class Vertex<
@Override
final public void readFields(DataInput in) throws IOException {
vertexId =
- BspUtils.<I>createVertexIndex(getContext().getConfiguration());
+ BspUtils.createVertexIndex(getContext().getConfiguration());
vertexId.readFields(in);
boolean hasVertexValue = in.readBoolean();
if (hasVertexValue) {
vertexValue =
- BspUtils.<V>createVertexValue(getContext().getConfiguration());
+ BspUtils.createVertexValue(getContext().getConfiguration());
vertexValue.readFields(in);
}
long edgeMapSize = in.readLong();
@@ -280,7 +160,7 @@ public abstract class Vertex<
long msgListSize = in.readLong();
for (long i = 0; i < msgListSize; ++i) {
M msg =
- BspUtils.<M>createMessageValue(getContext().getConfiguration());
+ BspUtils.createMessageValue(getContext().getConfiguration());
msg.readFields(in);
msgList.add(msg);
}
@@ -306,37 +186,10 @@ public abstract class Vertex<
}
@Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return graphMapper.getAggregatorUsage().registerAggregator(
- name, aggregatorClass);
- }
-
- @Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return graphMapper.getAggregatorUsage().getAggregator(name);
- }
-
- @Override
- public final boolean useAggregator(String name) {
- return graphMapper.getAggregatorUsage().useAggregator(name);
- }
-
- @Override
public List<M> getMsgList() {
return msgList;
}
- public final Mapper<?, ?, ?, ?>.Context getContext() {
- return context;
- }
-
- final static void setContext(Mapper<?, ?, ?, ?>.Context context) {
- Vertex.context = context;
- }
-
@Override
public String toString() {
return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java Sat Sep 10 00:40:21 2011
@@ -68,8 +68,8 @@ public class VertexRange<I extends Writa
/** Checkpoint file prefix (null if not recovering from a checkpoint) */
private String checkpointfilePrefix = null;
/** Vertex map for this range (keyed by index) */
- private final SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
- new TreeMap<I, BasicVertex<I, V, E, M>>();
+ private final SortedMap<I, Vertex<I, V, E, M>> vertexMap =
+ new TreeMap<I, Vertex<I, V, E, M>>();
/** Class logger */
private static final Logger LOG = Logger.getLogger(VertexRange.class);
@@ -207,7 +207,7 @@ public class VertexRange<I extends Writa
*
* @return Map of vertices (keyed by index)
*/
- public SortedMap<I, BasicVertex<I, V, E, M>> getVertexMap() {
+ public SortedMap<I, Vertex<I, V, E, M>> getVertexMap() {
return vertexMap;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Sat Sep 10 00:40:21 2011
@@ -18,14 +18,14 @@
package org.apache.giraph.graph;
-import java.util.List;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
+import java.util.List;
+
/**
* Default implementation of how to resolve vertex creation/removal, messages
* to nonexistent vertices, etc.
@@ -41,6 +41,9 @@ public class VertexResolver<I extends Wr
implements BasicVertexResolver<I, V, E, M>, Configurable {
/** Configuration */
private Configuration conf = null;
+
+ private GraphState<I,V,E,M> graphState;
+
/** Class logger */
private static final Logger LOG = Logger.getLogger(VertexResolver.class);
@@ -107,7 +110,7 @@ public class VertexResolver<I extends Wr
@Override
public MutableVertex<I, V, E, M> instantiateVertex() {
- return BspUtils.<I, V, E, M>createVertex(getConf());
+ return BspUtils.<I, V, E, M>createVertex(getConf(), graphState);
}
@Override
@@ -119,4 +122,8 @@ public class VertexResolver<I extends Wr
public void setConf(Configuration conf) {
this.conf = conf;
}
+
+ public void setGraphState(GraphState<I, V, E, M> graphState) {
+ this.graphState = graphState;
+ }
}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1167420&r1=1167419&r2=1167420&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Sat Sep 10 00:40:21 2011
@@ -18,35 +18,41 @@
package org.apache.giraph;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
+import junit.framework.Test;
+import junit.framework.TestSuite;
import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCombinerVertex;
import org.apache.giraph.examples.SimpleFailVertex;
import org.apache.giraph.examples.SimpleMsgVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.examples.SimpleShortestPathsVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimpleShortestPathsVertex;
import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
import org.apache.giraph.examples.SimpleSumCombiner;
import org.apache.giraph.examples.SimpleSuperstepVertex;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.GiraphJob;
-import junit.framework.Test;
-import junit.framework.TestSuite;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
/**
* Unit test for many simple BSP applications.
@@ -87,16 +93,22 @@ public class TestBspBasic extends BspCas
InvocationTargetException, SecurityException, NoSuchMethodException {
System.out.println("testInstantiateVertex: java.class.path=" +
System.getProperty("java.class.path"));
- java.lang.reflect.Constructor<?> ctor =
- SimpleSuperstepVertex.class.getConstructor();
- assertNotNull(ctor);
- SimpleSuperstepVertex test =
- (SimpleSuperstepVertex) ctor.newInstance();
+ GiraphJob job = new GiraphJob(getCallingMethodName());
+ job.setVertexClass(SimpleSuperstepVertex.class);
+ job.setVertexInputFormatClass(
+ SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
+ GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
+ new GraphState<LongWritable, IntWritable,
+ FloatWritable, IntWritable>();
+ Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
+ BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>
+ createVertex(job.getConfiguration(), gs);
System.out.println("testInstantiateVertex: superstep=" +
- test.getSuperstep());
- SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat inputFormat =
- SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat
- .class.newInstance();
+ vertex.getSuperstep());
+ VertexInputFormat<LongWritable, IntWritable, FloatWritable>
+ inputFormat =
+ BspUtils.<LongWritable, IntWritable, FloatWritable>
+ createVertexInputFormat(job.getConfiguration());
List<InputSplit> splitArray =
inputFormat.getSplits(
new JobContext(new Configuration(), new JobID()), 1);