You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/25 03:52:53 UTC
svn commit: r1377179 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/graph/
src/main/java/org/apache/giraph/graph/partition/
src/test/java/org/apache/giraph/ src/test/jav...
Author: aching
Date: Sat Aug 25 01:52:52 2012
New Revision: 1377179
URL: http://svn.apache.org/viewvc?rev=1377179&view=rev
Log:
GIRAPH-249: Move part of the graph out-of-core when memory is low
(apresta via aching).
Added:
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java
- copied, changed from r1375824, giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
giraph/trunk/src/test/java/org/apache/giraph/graph/partition/
giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
Removed:
giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sat Aug 25 01:52:52 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-249: Move part of the graph out-of-core when memory is low
+ (apresta via aching).
+
GIRAPH-306: Netty requests should be reliable and implement exactly
once semantics. (aching)
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Sat Aug 25 01:52:52 2012
@@ -21,10 +21,10 @@ package org.apache.giraph.bsp;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.graph.WorkerAggregatorUsage;
+import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -63,14 +63,14 @@ public interface CentralizedServiceWorke
WorkerContext getWorkerContext();
/**
- * Get a map of the partition id to the partition for this worker.
+ * Get the partition store for this worker.
* The partitions contain the vertices for
* this worker and can be used to run compute() for the vertices or do
* checkpointing.
*
- * @return List of partitions that this worker owns.
+ * @return The partition store for this worker.
*/
- Map<Integer, Partition<I, V, E, M>> getPartitionMap();
+ PartitionStore<I, V, E, M> getPartitionStore();
/**
* Get a collection of all the partition owners.
@@ -115,15 +115,31 @@ public interface CentralizedServiceWorke
boolean finishSuperstep(List<PartitionStats> partitionStatsList);
/**
- * Get the partition that a vertex index would belong to
+ * Get the partition that a vertex id would belong to.
*
- * @param vertexId Index of the vertex that is used to find the correct
+ * @param vertexId Id of the vertex that is used to find the correct
* partition.
* @return Correct partition if exists on this worker, null otherwise.
*/
Partition<I, V, E, M> getPartition(I vertexId);
/**
+ * Get the partition id that a vertex id would belong to.
+ *
+ * @param vertexId Vertex id
+ * @return Partition id
+ */
+ Integer getPartitionId(I vertexId);
+
+ /**
+ * Whether a partition with given id exists on this worker.
+ *
+ * @param partitionId Partition id
+ * @return True iff this worker has the specified partition
+ */
+ boolean hasPartition(Integer partitionId);
+
+ /**
* Every client will need to get a partition owner from a vertex id so that
* they know which worker to sent the request to.
*
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Aug 25 01:52:52 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
@@ -812,19 +811,7 @@ public abstract class BasicRPCCommunicat
LOG.debug("putVertexList: On partition id " + partitionId +
" adding vertex list of size " + vertexList.size());
}
- synchronized (inPartitionVertexMap) {
- if (vertexList.size() == 0) {
- return;
- }
- if (!inPartitionVertexMap.containsKey(partitionId)) {
- inPartitionVertexMap.put(partitionId,
- Lists.newArrayList(vertexList));
- } else {
- Collection<Vertex<I, V, E, M>> tmpVertices =
- inPartitionVertexMap.get(partitionId);
- tmpVertices.addAll(vertexList);
- }
- }
+ service.getPartitionStore().addPartitionVertices(partitionId, vertexList);
}
@Override
@@ -1170,7 +1157,7 @@ public abstract class BasicRPCCommunicat
// Assign the messages to each destination vertex (getting rid of
// the old ones)
for (Partition<I, V, E, M> partition :
- service.getPartitionMap().values()) {
+ service.getPartitionStore().getPartitions()) {
for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
List<M> msgList = inMessages.get(vertex.getId());
if (msgList != null) {
@@ -1263,7 +1250,7 @@ public abstract class BasicRPCCommunicat
if (partition == null) {
throw new IllegalStateException(
"prepareSuperstep: No partition for index " + vertexIndex +
- " in " + service.getPartitionMap() + " should have been " +
+ " in " + service.getPartitionStore() + " should have been " +
service.getVertexPartitionOwner(vertexIndex));
}
if (vertex != null) {
@@ -1315,10 +1302,4 @@ public abstract class BasicRPCCommunicat
public String getName() {
return myName;
}
-
- @Override
- public Map<Integer, Collection<Vertex<I, V, E, M>>>
- getInPartitionVertexMap() {
- return inPartitionVertexMap;
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java Sat Aug 25 01:52:52 2012
@@ -28,8 +28,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
/**
* Netty based implementation of the {@link WorkerClientServer} interface.
@@ -126,12 +124,6 @@ public class NettyWorkerClientServer<I e
}
@Override
- public Map<Integer, Collection<Vertex<I, V, E, M>>>
- getInPartitionVertexMap() {
- return server.getInPartitionVertexMap();
- }
-
- @Override
public ServerData<I, V, E, M> getServerData() {
return server.getServerData();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java Sat Aug 25 01:52:52 2012
@@ -20,8 +20,8 @@ package org.apache.giraph.comm;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
import org.apache.giraph.comm.messages.FlushableMessageStore;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.MessageStoreFactory;
@@ -40,8 +40,6 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
-import java.util.Collection;
-import java.util.Map;
import java.util.Set;
/**
@@ -85,7 +83,7 @@ public class NettyWorkerServer<I extends
GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
if (!useOutOfCoreMessaging) {
serverData = new ServerData<I, V, E, M>(
- SimpleMessageStore.newFactory(service, conf));
+ conf, SimpleMessageStore.newFactory(service, conf));
} else {
int maxMessagesInMemory = conf.getInt(GiraphJob.MAX_MESSAGES_IN_MEMORY,
GiraphJob.MAX_MESSAGES_IN_MEMORY_DEFAULT);
@@ -97,7 +95,7 @@ public class NettyWorkerServer<I extends
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
storeFactory = DiskBackedMessageStoreByPartition.newFactory(service,
maxMessagesInMemory, partitionStoreFactory);
- serverData = new ServerData<I, V, E, M>(storeFactory);
+ serverData = new ServerData<I, V, E, M>(conf, storeFactory);
}
nettyServer = new NettyServer<I, V, E, M>(conf, serverData);
@@ -115,18 +113,16 @@ public class NettyWorkerServer<I extends
Set<I> resolveVertexIndexSet = Sets.newHashSet();
// Keep track of the vertices which are not here but have received messages
- for (I vertexId :
- serverData.getCurrentMessageStore().getDestinationVertices()) {
- Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
- if (vertex == null) {
- if (service.getPartition(vertexId) == null) {
- throw new IllegalStateException(
- "prepareSuperstep: No partition for vertex index " + vertexId);
- }
- if (!resolveVertexIndexSet.add(vertexId)) {
- throw new IllegalStateException(
- "prepareSuperstep: Already has missing vertex on this " +
- "worker for " + vertexId);
+ for (Integer partitionId : service.getPartitionStore().getPartitionIds()) {
+ for (I vertexId : serverData.getCurrentMessageStore().
+ getPartitionDestinationVertices(partitionId)) {
+ Vertex<I, V, E, M> vertex = service.getVertex(vertexId);
+ if (vertex == null) {
+ if (!resolveVertexIndexSet.add(vertexId)) {
+ throw new IllegalStateException(
+ "prepareSuperstep: Already has missing vertex on this " +
+ "worker for " + vertexId);
+ }
}
}
}
@@ -175,7 +171,7 @@ public class NettyWorkerServer<I extends
if (partition == null) {
throw new IllegalStateException(
"prepareSuperstep: No partition for index " + vertexIndex +
- " in " + service.getPartitionMap() + " should have been " +
+ " in " + service.getPartitionStore() + " should have been " +
service.getVertexPartitionOwner(vertexIndex));
}
if (vertex != null) {
@@ -193,12 +189,6 @@ public class NettyWorkerServer<I extends
}
@Override
- public Map<Integer, Collection<Vertex<I, V, E, M>>>
- getInPartitionVertexMap() {
- return serverData.getPartitionVertexMap();
- }
-
- @Override
public ServerData<I, V, E, M> getServerData() {
return serverData;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Sat Aug 25 01:52:52 2012
@@ -18,21 +18,20 @@
package org.apache.giraph.comm;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.giraph.comm.RequestRegistry.Type;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+
/**
* Send a collection of vertices for a partition.
*
@@ -98,25 +97,12 @@ public class SendVertexRequest<I extends
@Override
public void doRequest(ServerData<I, V, E, M> serverData) {
- ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
- partitionVertexMap = serverData.getPartitionVertexMap();
if (vertices.isEmpty()) {
LOG.warn("doRequest: Got an empty request!");
return;
}
- Collection<Vertex<I, V, E, M>> vertexMap =
- partitionVertexMap.get(partitionId);
- if (vertexMap == null) {
- final Collection<Vertex<I, V, E, M>> tmpVertices =
- Lists.newArrayListWithCapacity(vertices.size());
- vertexMap = partitionVertexMap.putIfAbsent(partitionId, tmpVertices);
- if (vertexMap == null) {
- vertexMap = tmpVertices;
- }
- }
- synchronized (vertexMap) {
- vertexMap.addAll(vertices);
- }
+ serverData.getPartitionStore().addPartitionVertices(partitionId,
+ vertices);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Sat Aug 25 01:52:52 2012
@@ -20,13 +20,16 @@ package org.apache.giraph.comm;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.partition.DiskBackedPartitionStore;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.graph.partition.SimplePartitionStore;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.IOException;
-import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -40,14 +43,8 @@ import java.util.concurrent.ConcurrentHa
@SuppressWarnings("rawtypes")
public class ServerData<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Map of partition ids to incoming vertices from other workers.
- * (Synchronized on values)
- */
- private final ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
- inPartitionVertexMap =
- new ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>();
-
+ /** Partition store for this worker. */
+ private volatile PartitionStore<I, V, E, M> partitionStore;
/** Message store factory */
private final
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
@@ -68,23 +65,34 @@ public class ServerData<I extends Writab
private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
- /** @param messageStoreFactory Factory for message stores */
- public ServerData(MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
- messageStoreFactory) {
+ /**
+ * Constructor.
+ *
+ * @param configuration Configuration
+ * @param messageStoreFactory Factory for message stores
+ */
+ public ServerData(Configuration configuration,
+ MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+ messageStoreFactory) {
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
+ if (configuration.getBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH,
+ GiraphJob.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
+ partitionStore = new DiskBackedPartitionStore<I, V, E, M>(configuration);
+ } else {
+ partitionStore = new SimplePartitionStore<I, V, E, M>(configuration);
+ }
}
/**
- * Get the partition vertices (synchronize on the values)
+ * Return the partition store for this worker.
*
- * @return Partition vertices
+ * @return The partition store
*/
- public ConcurrentHashMap<Integer, Collection<Vertex<I, V, E, M>>>
- getPartitionVertexMap() {
- return inPartitionVertexMap;
+ public PartitionStore<I, V, E, M> getPartitionStore() {
+ return partitionStore;
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,10 @@
package org.apache.giraph.comm;
-import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.Closeable;
-import java.util.Collection;
-import java.util.Map;
/**
* Interface for message communication server.
@@ -52,15 +49,6 @@ public interface WorkerServer<I extends
void prepareSuperstep();
/**
- * Get the vertices that were sent in the last iteration. After getting
- * the map, the user should synchronize with it to insure it
- * is thread-safe.
- *
- * @return map of vertex ranges to vertices
- */
- Map<Integer, Collection<Vertex<I, V, E, M>>> getInPartitionVertexMap();
-
- /**
* Get server data
*
* @return Server data
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Aug 25 01:52:52 2012
@@ -24,11 +24,12 @@ import org.apache.giraph.comm.NettyWorke
import org.apache.giraph.comm.RPCCommunications;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClientServer;
-import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.graph.partition.SimplePartitionStore;
import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.WritableUtils;
@@ -100,9 +101,6 @@ public class BspServiceWorker<I extends
inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
/** Communication service */
private final WorkerClientServer<I, V, E, M> commService;
- /** Structure to store the partitions on this worker */
- private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
- new HashMap<Integer, Partition<I, V, E, M>>();
/** Have the partition exchange children (workers) changed? */
private final BspEvent partitionExchangeChildrenChanged;
/** Regulates the size of outgoing Collections of vertices read
@@ -118,6 +116,10 @@ public class BspServiceWorker<I extends
private long totalEdgesLoaded = 0;
/** Input split max vertices (-1 denotes all) */
private final long inputSplitMaxVertices;
+ /**
+ * Partition store for worker (only used by the Hadoop RPC implementation).
+ */
+ private final PartitionStore<I, V, E, M> workerPartitionStore;
/**
* Constructor for setting up the worker.
@@ -171,6 +173,13 @@ public class BspServiceWorker<I extends
this.workerContext =
BspUtils.createWorkerContext(getConfiguration(),
graphMapper.getGraphState());
+
+ if (useNetty) {
+ workerPartitionStore = null;
+ } else {
+ workerPartitionStore =
+ new SimplePartitionStore<I, V, E, M>(getConfiguration());
+ }
}
public WorkerContext getWorkerContext() {
@@ -558,7 +567,7 @@ public class BspServiceWorker<I extends
Collection<? extends PartitionOwner> masterSetPartitionOwners =
startSuperstep();
workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
commService.setup();
@@ -647,18 +656,15 @@ public class BspServiceWorker<I extends
getInputSplitsAllDoneEvent().reset();
}
- // At this point all vertices have been sent to their destinations.
- // Move them to the worker, creating the empty partitions
- movePartitionsToWorker(commService);
+ // Create remaining partitions owned by this worker.
for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
- !getPartitionMap().containsKey(
+ !getPartitionStore().hasPartition(
partitionOwner.getPartitionId())) {
Partition<I, V, E, M> partition =
new Partition<I, V, E, M>(getConfiguration(),
partitionOwner.getPartitionId());
- getPartitionMap().put(partitionOwner.getPartitionId(),
- partition);
+ getPartitionStore().addPartition(partition);
}
}
@@ -666,7 +672,8 @@ public class BspServiceWorker<I extends
// if necessary
List<PartitionStats> partitionStatsList =
new ArrayList<PartitionStats>();
- for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
PartitionStats partitionStats =
new PartitionStats(partition.getId(),
partition.getVertices().size(),
@@ -675,7 +682,7 @@ public class BspServiceWorker<I extends
partitionStatsList.add(partitionStats);
}
workerGraphPartitioner.finalizePartitionStats(
- partitionStatsList, workerPartitionMap);
+ partitionStatsList, getPartitionStore());
finishSuperstep(partitionStatsList);
}
@@ -985,7 +992,7 @@ public class BspServiceWorker<I extends
if (LOG.isInfoEnabled()) {
LOG.info("finishSuperstep: Superstep " + getSuperstep() +
- ", mesages = " + workerSentMessages + " " +
+ ", messages = " + workerSentMessages + " " +
MemoryUtils.getRuntimeMemoryStats());
}
@@ -993,7 +1000,7 @@ public class BspServiceWorker<I extends
marshalAggregatorValues(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
workerGraphPartitioner.finalizePartitionStats(
- partitionStatsList, workerPartitionMap);
+ partitionStatsList, getPartitionStore());
List<PartitionStats> finalizedPartitionStatsList =
new ArrayList<PartitionStats>(finalizedPartitionStats);
byte [] partitionStatsBytes =
@@ -1086,7 +1093,8 @@ public class BspServiceWorker<I extends
VertexWriter<I, V, E> vertexWriter =
vertexOutputFormat.createVertexWriter(getContext());
vertexWriter.initialize(getContext());
- for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
vertexWriter.writeVertex(vertex);
}
@@ -1185,7 +1193,8 @@ public class BspServiceWorker<I extends
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
- for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+ for (Partition<I, V, E, M> partition :
+ getPartitionStore().getPartitions()) {
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
// write messages
@@ -1212,7 +1221,7 @@ public class BspServiceWorker<I extends
// needs to know how many partitions this worker owns
FSDataOutputStream metadataOutputStream =
getFs().create(metadataFilePath);
- metadataOutputStream.writeInt(workerPartitionMap.size());
+ metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
metadataOutputStream.write(metadataByteStream.toByteArray());
metadataOutputStream.close();
verticesOutputStream.close();
@@ -1313,11 +1322,12 @@ public class BspServiceWorker<I extends
LOG.info("loadCheckpoint: Loaded partition " +
partition);
}
- if (getPartitionMap().put(partitionId, partition) != null) {
+ if (getPartitionStore().hasPartition(partitionId)) {
throw new IllegalStateException(
"loadCheckpoint: Already has partition owner " +
partitionOwner);
}
+ getPartitionStore().addPartition(partition);
++loadedPartitions;
} catch (IOException e) {
throw new RuntimeException(
@@ -1370,7 +1380,7 @@ public class BspServiceWorker<I extends
randomEntryList) {
for (Integer partitionId : workerPartitionList.getValue()) {
Partition<I, V, E, M> partition =
- getPartitionMap().get(partitionId);
+ getPartitionStore().removePartition(partitionId);
if (partition == null) {
throw new IllegalStateException(
"sendWorkerPartitions: Couldn't find partition " +
@@ -1385,7 +1395,6 @@ public class BspServiceWorker<I extends
getGraphMapper().getGraphState().getWorkerCommunications().
sendPartitionRequest(workerPartitionList.getKey(),
partition);
- getPartitionMap().remove(partitionId);
}
}
@@ -1428,12 +1437,12 @@ public class BspServiceWorker<I extends
// 5. Add the partitions to myself.
PartitionExchange partitionExchange =
workerGraphPartitioner.updatePartitionOwners(
- getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+ getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
commService.fixPartitionIdToSocketAddrMap();
Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
partitionExchange.getSendWorkerPartitionMap();
- if (!workerPartitionMap.isEmpty()) {
+ if (!getPartitionStore().isEmpty()) {
sendWorkerPartitions(sendWorkerPartitionMap);
}
@@ -1446,7 +1455,7 @@ public class BspServiceWorker<I extends
"exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
}
}
- if (myDependencyWorkerSet.isEmpty() && workerPartitionMap.isEmpty()) {
+ if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
if (LOG.isInfoEnabled()) {
LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
"exiting early");
@@ -1481,56 +1490,6 @@ public class BspServiceWorker<I extends
if (LOG.isInfoEnabled()) {
LOG.info("exchangeVertexPartitions: Done with exchange.");
}
-
- // Add the partitions sent earlier
- movePartitionsToWorker(commService);
- }
-
- /**
- * Partitions that are exchanged need to be moved from the communication
- * service to the worker.
- *
- * @param commService Communication service where the partitions are
- * temporarily stored.
- */
- private void movePartitionsToWorker(
- WorkerServer<I, V, E, M> commService) {
- Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
- commService.getInPartitionVertexMap();
- synchronized (inPartitionVertexMap) {
- for (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
- inPartitionVertexMap.entrySet()) {
- if (getPartitionMap().containsKey(entry.getKey())) {
- throw new IllegalStateException(
- "moveVerticesToWorker: Already has partition " +
- getPartitionMap().get(entry.getKey()) +
- ", cannot receive vertex list of size " +
- entry.getValue().size());
- }
-
- Partition<I, V, E, M> tmpPartition =
- new Partition<I, V, E, M>(getConfiguration(),
- entry.getKey());
- synchronized (entry.getValue()) {
- for (Vertex<I, V, E, M> vertex : entry.getValue()) {
- if (tmpPartition.putVertex(vertex) != null) {
- throw new IllegalStateException(
- "moveVerticesToWorker: Vertex " + vertex +
- " already exists!");
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("movePartitionsToWorker: Adding " +
- entry.getValue().size() +
- " vertices for partition id " + entry.getKey());
- }
- getPartitionMap().put(tmpPartition.getId(),
- tmpPartition);
- entry.getValue().clear();
- }
- }
- inPartitionVertexMap.clear();
- }
}
/**
@@ -1586,8 +1545,12 @@ public class BspServiceWorker<I extends
}
@Override
- public Map<Integer, Partition<I, V, E, M>> getPartitionMap() {
- return workerPartitionMap;
+ public PartitionStore<I, V, E, M> getPartitionStore() {
+ if (workerPartitionStore != null) {
+ return workerPartitionStore;
+ } else {
+ return getServerData().getPartitionStore();
+ }
}
@Override
@@ -1600,22 +1563,27 @@ public class BspServiceWorker<I extends
return workerGraphPartitioner.getPartitionOwner(vertexId);
}
- /**
- * Get the partition for a vertex index.
- *
- * @param vertexId Vertex index to search for the partition.
- * @return Partition that owns this vertex.
- */
+ @Override
public Partition<I, V, E, M> getPartition(I vertexId) {
+ return getPartitionStore().getPartition(getPartitionId(vertexId));
+ }
+
+ @Override
+ public Integer getPartitionId(I vertexId) {
PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
- return workerPartitionMap.get(partitionOwner.getPartitionId());
+ return partitionOwner.getPartitionId();
+ }
+
+ @Override
+ public boolean hasPartition(Integer partitionId) {
+ return getPartitionStore().hasPartition(partitionId);
}
@Override
public Vertex<I, V, E, M> getVertex(I vertexId) {
PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
- if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
- return workerPartitionMap.get(
+ if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
+ return getPartitionStore().getPartition(
partitionOwner.getPartitionId()).getVertex(vertexId);
} else {
return null;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sat Aug 25 01:52:52 2012
@@ -436,6 +436,24 @@ public class GiraphJob {
/** Default size of buffer when reading and writing messages out-of-core. */
public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+ /** Directory in the local filesystem for out-of-core partitions. */
+ public static final String PARTITIONS_DIRECTORY =
+ "giraph.partitionsDirectory";
+ /** Default directory for out-of-core partitions. */
+ public static final String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+
+ /** Enable out-of-core graph. */
+ public static final String USE_OUT_OF_CORE_GRAPH =
+ "giraph.useOutOfCoreGraph";
+ /** Default is not to use out-of-core graph. */
+ public static final boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+
+ /** Maximum number of partitions to hold in memory for each worker. */
+ public static final String MAX_PARTITIONS_IN_MEMORY =
+ "giraph.maxPartitionsInMemory";
+ /** Default maximum number of in-memory partitions. */
+ public static final int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+
/** Keep the zookeeper output for debugging? Default is to remove it. */
public static final String KEEP_ZOOKEEPER_DATA =
"giraph.keepZooKeeperData";
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sat Aug 25 01:52:52 2012
@@ -477,11 +477,10 @@ public class GraphMapper<I extends Writa
TimedLogger partitionLogger = new TimedLogger(15000, LOG);
int completedPartitions = 0;
for (Partition<I, V, E, M> partition :
- serviceWorker.getPartitionMap().values()) {
+ serviceWorker.getPartitionStore().getPartitions()) {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0);
- for (Vertex<I, V, E, M> vertex :
- partition.getVertices()) {
+ for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
// Make sure every vertex has the current
// graphState before computing
vertex.setGraphState(graphState);
@@ -522,8 +521,8 @@ public class GraphMapper<I extends Writa
partitionStatsList.add(partitionStats);
++completedPartitions;
partitionLogger.info("map: Completed " + completedPartitions + " of " +
- serviceWorker.getPartitionMap().size() + " partitions " +
- MemoryUtils.getRuntimeMemoryStats());
+ serviceWorker.getPartitionStore().getNumPartitions() +
+ " partitions " + MemoryUtils.getRuntimeMemoryStats());
}
} while (!serviceWorker.finishSuperstep(partitionStatsList));
if (LOG.isInfoEnabled()) {
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A partition store that can possibly spill to disk.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class DiskBackedPartitionStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends PartitionStore<I, V, E, M> {
+ /** Class logger. */
+ private static final Logger LOG =
+ Logger.getLogger(DiskBackedPartitionStore.class);
+ /** Map of partitions kept in memory. */
+ private final ConcurrentMap<Integer, Partition<I, V, E, M>>
+ inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
+ /** Maximum number of partitions to keep in memory. */
+ private int maxInMemoryPartitions;
+ /** Map of partitions kept out-of-core. The values are partition sizes. */
+ private final ConcurrentMap<Integer, Integer> onDiskPartitions =
+ Maps.newConcurrentMap();
+ /** Directory on the local file system for storing out-of-core partitions. */
+ private final String basePath;
+ /** Configuration. */
+ private final Configuration conf;
+ /** Slot for loading out-of-core partitions. */
+ private Partition<I, V, E, M> loadedPartition;
+ /** Locks for accessing and modifying partitions. */
+ private final ConcurrentMap<Integer, Lock> partitionLocks =
+ Maps.newConcurrentMap();
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration
+ */
+ public DiskBackedPartitionStore(Configuration conf) {
+ this.conf = conf;
+ // We must be able to hold at least one partition in memory
+ maxInMemoryPartitions = Math.max(1,
+ conf.getInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY,
+ GiraphJob.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+ basePath = conf.get("mapred.job.id", "Unknown Job") +
+ conf.get(GiraphJob.PARTITIONS_DIRECTORY,
+ GiraphJob.PARTITIONS_DIRECTORY_DEFAULT);
+ }
+
+ /**
+ * Get the path to the file where a partition is stored.
+ *
+ * @param partitionId The partition
+ * @return The path to the given partition
+ */
+ private String getPartitionPath(Integer partitionId) {
+ return basePath + "/partition-" + partitionId;
+ }
+
+ /**
+ * Create a new lock for a partition, lock it, and return it. If already
+ * existing, return null.
+ *
+ * @param partitionId Partition id
+ * @return A newly created lock, or null if already present
+ */
+ private Lock createLock(Integer partitionId) {
+ Lock lock = new ReentrantLock(true);
+ lock.lock();
+ if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
+ return null;
+ }
+ return lock;
+ }
+
+ /**
+ * Get the lock for a partition id.
+ *
+ * @param partitionId Partition id
+ * @return The lock
+ */
+ private Lock getLock(Integer partitionId) {
+ return partitionLocks.get(partitionId);
+ }
+
+ /**
+ * Write a partition to disk.
+ *
+ * @param partition The partition object to write
+ * @throws java.io.IOException
+ */
+ private void writePartition(Partition<I, V, E, M> partition)
+ throws IOException {
+ File file = new File(getPartitionPath(partition.getId()));
+ file.getParentFile().mkdirs();
+ file.createNewFile();
+ DataOutputStream outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file)));
+ for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+ vertex.write(outputStream);
+ }
+ outputStream.close();
+ }
+
+ /**
+ * Read a partition from disk.
+ *
+ * @param partitionId Id of the partition to read
+ * @return The partition object
+ * @throws IOException
+ */
+ private Partition<I, V, E, M> readPartition(Integer partitionId)
+ throws IOException {
+ Partition<I, V, E, M> partition = new Partition<I, V, E, M>(conf,
+ partitionId);
+ File file = new File(getPartitionPath(partitionId));
+ DataInputStream inputStream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file)));
+ int numVertices = onDiskPartitions.get(partitionId);
+ for (int i = 0; i < numVertices; ++i) {
+ Vertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(conf);
+ vertex.readFields(inputStream);
+ partition.putVertex(vertex);
+ }
+ inputStream.close();
+ file.delete();
+ return partition;
+ }
+
+ /**
+ * Append some vertices to an out-of-core partition.
+ *
+ * @param partitionId Id of the destination partition
+ * @param vertices Vertices to be added
+ * @throws IOException
+ */
+ private void appendVertices(Integer partitionId,
+ Collection<Vertex<I, V, E, M>> vertices)
+ throws IOException {
+ File file = new File(getPartitionPath(partitionId));
+ DataOutputStream outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file, true)));
+ for (Vertex<I, V, E, M> vertex : vertices) {
+ vertex.write(outputStream);
+ }
+ outputStream.close();
+ }
+
+ /**
+ * Load an out-of-core partition in memory.
+ *
+ * @param partitionId Partition id
+ */
+ private void loadPartition(Integer partitionId) {
+ if (loadedPartition != null) {
+ if (loadedPartition.getId() == partitionId) {
+ return;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
+ " out of core");
+ }
+ try {
+ writePartition(loadedPartition);
+ onDiskPartitions.put(loadedPartition.getId(),
+ loadedPartition.getVertices().size());
+ loadedPartition = null;
+ } catch (IOException e) {
+ throw new IllegalStateException("loadPartition: failed writing " +
+ "partition " + loadedPartition.getId() + " to disk", e);
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadPartition: loading partition " + partitionId +
+ " in memory");
+ }
+ try {
+ loadedPartition = readPartition(partitionId);
+ } catch (IOException e) {
+ throw new IllegalStateException("loadPartition: failed reading " +
+ "partition " + partitionId + " from disk");
+ }
+ }
+
+ /**
+ * Add a new partition without requiring a lock.
+ *
+ * @param partition Partition to be added
+ */
+ private void addPartitionNoLock(Partition<I, V, E, M> partition) {
+ synchronized (inMemoryPartitions) {
+ if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
+ inMemoryPartitions.put(partition.getId(), partition);
+
+ return;
+ }
+ }
+ try {
+ writePartition(partition);
+ onDiskPartitions.put(partition.getId(), partition.getVertices().size());
+ } catch (IOException e) {
+ throw new IllegalStateException("addPartition: failed writing " +
+ "partition " + partition.getId() + "to disk");
+ }
+ }
+
+ @Override
+ public void addPartition(Partition<I, V, E, M> partition) {
+ Lock lock = createLock(partition.getId());
+ if (lock == null) {
+ throw new IllegalStateException("addPartition: partition " +
+ partition.getId() + " already exists");
+ }
+ addPartitionNoLock(partition);
+ lock.unlock();
+ }
+
+ @Override
+ public void addPartitionVertices(Integer partitionId,
+ Collection<Vertex<I, V, E, M>> vertices) {
+ if (inMemoryPartitions.containsKey(partitionId)) {
+ Partition<I, V, E, M> partition = inMemoryPartitions.get(partitionId);
+ partition.putVertices(vertices);
+ } else if (onDiskPartitions.containsKey(partitionId)) {
+ Lock lock = getLock(partitionId);
+ lock.lock();
+ if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+ loadedPartition.putVertices(vertices);
+ } else {
+ try {
+ appendVertices(partitionId, vertices);
+ onDiskPartitions.put(partitionId,
+ onDiskPartitions.get(partitionId) + vertices.size());
+ } catch (IOException e) {
+ throw new IllegalStateException("addPartitionVertices: failed " +
+ "writing vertices to partition " + partitionId + " on disk", e);
+ }
+ }
+ lock.unlock();
+ } else {
+ Lock lock = createLock(partitionId);
+ if (lock != null) {
+ addPartitionNoLock(new Partition<I, V, E, M>(conf, partitionId));
+ lock.unlock();
+ } else {
+ // Another thread is already creating the partition,
+ // so we make sure it's done before repeating the call.
+ lock = getLock(partitionId);
+ lock.lock();
+ lock.unlock();
+ }
+ addPartitionVertices(partitionId, vertices);
+ }
+ }
+
+ @Override
+ public Partition<I, V, E, M> getPartition(Integer partitionId) {
+ if (inMemoryPartitions.containsKey(partitionId)) {
+ return inMemoryPartitions.get(partitionId);
+ } else if (onDiskPartitions.containsKey(partitionId)) {
+ loadPartition(partitionId);
+ return loadedPartition;
+ } else {
+ throw new IllegalStateException("getPartition: partition " +
+ partitionId + " does not exist");
+ }
+ }
+
+ @Override
+ public Partition<I, V, E, M> removePartition(Integer partitionId) {
+ partitionLocks.remove(partitionId);
+ if (onDiskPartitions.containsKey(partitionId)) {
+ Partition<I, V, E, M> partition;
+ if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+ partition = loadedPartition;
+ loadedPartition = null;
+ } else {
+ try {
+ partition = readPartition(partitionId);
+ } catch (IOException e) {
+ throw new IllegalStateException("removePartition: failed reading " +
+ "partition " + partitionId + " from disk", e);
+ }
+ }
+ onDiskPartitions.remove(partitionId);
+ return partition;
+ } else {
+ return inMemoryPartitions.remove(partitionId);
+ }
+ }
+
+ @Override
+ public void deletePartition(Integer partitionId) {
+ partitionLocks.remove(partitionId);
+ if (inMemoryPartitions.containsKey(partitionId)) {
+ inMemoryPartitions.remove(partitionId);
+ } else {
+ if (loadedPartition != null && loadedPartition.getId() == partitionId) {
+ loadedPartition = null;
+ } else {
+ File file = new File(getPartitionPath(partitionId));
+ file.delete();
+ }
+ onDiskPartitions.remove(partitionId);
+ }
+ }
+
+ @Override
+ public boolean hasPartition(Integer partitionId) {
+ return partitionLocks.containsKey(partitionId);
+ }
+
+ @Override
+ public Iterable<Integer> getPartitionIds() {
+ return Iterables.concat(inMemoryPartitions.keySet(),
+ onDiskPartitions.keySet());
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return partitionLocks.size();
+ }
+
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Sat Aug 25 01:52:52 2012
@@ -18,6 +18,10 @@
package org.apache.giraph.graph.partition;
+import org.apache.giraph.graph.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
/**
* Implements hash-based partitioning from the id hash code.
*
@@ -62,7 +62,7 @@ public class HashWorkerPartitioner<I ext
@Override
public Collection<PartitionStats> finalizePartitionStats(
Collection<PartitionStats> workerPartitionStats,
- Map<Integer, Partition<I, V, E, M>> partitionMap) {
+ PartitionStore<I, V, E, M> partitionStore) {
// No modification necessary
return workerPartitionStats;
}
@@ -71,7 +71,7 @@ public class HashWorkerPartitioner<I ext
public PartitionExchange updatePartitionOwners(
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
- Map<Integer, Partition<I, V, E, M>> partitionMap) {
+ PartitionStore<I, V, E, M> partitionStore) {
synchronized (partitionOwnerList) {
partitionOwnerList.clear();
partitionOwnerList.addAll(masterSetPartitionOwners);
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Sat Aug 25 01:52:52 2012
@@ -31,7 +31,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
/**
* A generic container that stores vertices. Vertex ids will map to exactly
@@ -51,7 +52,7 @@ public class Partition<I extends Writabl
/** Partition id */
private final int id;
/** Vertex map for this range (keyed by index) */
- private final Map<I, Vertex<I, V, E, M>> vertexMap;
+ private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
/**
* Constructor.
@@ -64,9 +65,9 @@ public class Partition<I extends Writabl
this.id = id;
if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
- vertexMap = Maps.newTreeMap();
+ vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
- vertexMap = Maps.newHashMap();
+ vertexMap = Maps.newConcurrentMap();
}
}
@@ -110,6 +111,17 @@ public class Partition<I extends Writabl
}
/**
+ * Put several vertices in the partition.
+ *
+ * @param vertices Vertices to add
+ */
+ public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
+ for (Vertex<I, V, E , M> vertex : vertices) {
+ vertexMap.put(vertex.getId(), vertex);
+ }
+ }
+
+ /**
* Get the number of edges in this partition. Computed on the fly.
*
* @return Number of edges.
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+
+/**
+ * Structure that stores partitions for a worker.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class PartitionStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+
+ /**
+ * Add a new partition to the store.
+ *
+ * @param partition Partition
+ */
+ public abstract void addPartition(Partition<I, V, E, M> partition);
+
+ /**
+ * Add some vertices to a (possibly existing) partition.
+ *
+ * @param partitionId Id of the destination partition
+ * @param vertices Vertices
+ */
+ public abstract void addPartitionVertices(
+ Integer partitionId, Collection<Vertex<I, V, E, M>> vertices);
+
+ /**
+ * Get a partition.
+ *
+ * @param partitionId Partition id
+ * @return The requested partition
+ */
+ public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
+
+ /**
+ * Remove a partition and return it.
+ *
+ * @param partitionId Partition id
+ * @return The removed partition
+ */
+ public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
+
+ /**
+ * Just delete a partition
+ * (more efficient than {@link #removePartition(Integer partitionID)} if the
+ * partition is out of core).
+ *
+ * @param partitionId Partition id
+ */
+ public abstract void deletePartition(Integer partitionId);
+
+ /**
+ * Whether a specific partition is present in the store.
+ *
+ * @param partitionId Partition id
+ * @return True iff the partition is present
+ */
+ public abstract boolean hasPartition(Integer partitionId);
+
+ /**
+ * Return the ids of all the stored partitions as an Iterable.
+ *
+ * @return The partition ids
+ */
+ public abstract Iterable<Integer> getPartitionIds();
+
+ /**
+ * Return the number of stored partitions.
+ *
+ * @return The number of partitions
+ */
+ public abstract int getNumPartitions();
+
+ /**
+ * Whether the partition store is empty.
+ *
+ * @return True iff there are no partitions in the store
+ */
+ public boolean isEmpty() {
+ return getNumPartitions() == 0;
+ }
+
+ /**
+ * Return all the stored partitions as an Iterable.
+ *
+ * @return The partition objects
+ */
+ public Iterable<Partition<I, V, E, M>> getPartitions() {
+ return Iterables.transform(getPartitionIds(),
+ new Function<Integer, Partition<I, V, E, M>>() {
+ @Override
+ public Partition<I, V, E, M> apply(Integer partitionId) {
+ return getPartition(partitionId);
+ }
+ });
+ }
+}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A simple in-memory partition store.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class SimplePartitionStore<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends PartitionStore<I, V, E, M> {
+ /** Map of stored partitions. */
+ private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
+ Maps.newConcurrentMap();
+ /** Configuration. */
+ private final Configuration conf;
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration
+ */
+ public SimplePartitionStore(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void addPartition(Partition<I, V, E, M> partition) {
+ if (partitions.putIfAbsent(partition.getId(), partition) != null) {
+ throw new IllegalStateException("addPartition: partition " +
+ partition.getId() + " already exists");
+ }
+ }
+
+ @Override
+ public void addPartitionVertices(Integer partitionId,
+ Collection<Vertex<I, V, E, M>> vertices) {
+ Partition<I, V, E, M> partition = partitions.get(partitionId);
+ if (partition == null) {
+ Partition<I, V, E, M> newPartition = new Partition<I, V, E, M>(conf,
+ partitionId);
+ partition = partitions.putIfAbsent(partitionId, newPartition);
+ if (partition == null) {
+ partition = newPartition;
+ }
+ }
+ partition.putVertices(vertices);
+ }
+
+ @Override
+ public Partition<I, V, E, M> getPartition(Integer partitionId) {
+ return partitions.get(partitionId);
+ }
+
+ @Override
+ public Partition<I, V, E, M> removePartition(Integer partitionId) {
+ return partitions.remove(partitionId);
+ }
+
+ @Override
+ public void deletePartition(Integer partitionId) {
+ partitions.remove(partitionId);
+ }
+
+ @Override
+ public boolean hasPartition(Integer partitionId) {
+ return partitions.containsKey(partitionId);
+ }
+
+ @Override
+ public Iterable<Integer> getPartitionIds() {
+ return partitions.keySet();
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return partitions.size();
+ }
+
+
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,12 @@
package org.apache.giraph.graph.partition;
-import java.util.Collection;
-import java.util.Map;
-
import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import java.util.Collection;
+
/**
* Stores the {@link PartitionOwner} objects from the master and provides the
* mapping of vertex to {@link PartitionOwner}. Also generates the partition
@@ -62,13 +61,13 @@ public interface WorkerGraphPartitioner<
*
* @param workerPartitionStats Stats generated by the infrastructure during
* the superstep
- * @param partitionMap Map of all the partitions owned by this worker
+ * @param partitionStore Partition store for this worker
* (could be used to provide more useful stat information)
* @return Final partition stats
*/
Collection<PartitionStats> finalizePartitionStats(
Collection<PartitionStats> workerPartitionStats,
- Map<Integer, Partition<I, V, E, M>> partitionMap);
+ PartitionStore<I, V, E, M> partitionStore);
/**
* Get the partitions owners and update locally. Returns the partitions
@@ -77,14 +76,14 @@ public interface WorkerGraphPartitioner<
* @param myWorkerInfo Worker info.
* @param masterSetPartitionOwners Master set partition owners, received
* prior to beginning the superstep
- * @param partitionMap Map of all the partitions owned by this worker
+ * @param partitionStore Partition store for this worker
* (can be used to fill the return map of partitions to send)
* @return Information for the partition exchange.
*/
PartitionExchange updatePartitionOwners(
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
- Map<Integer, Partition<I, V, E, M>> partitionMap);
+ PartitionStore<I, V, E, M> partitionStore);
/**
* Get a collection of the {@link PartitionOwner} objects.
Copied: giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java (from r1375824, giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java?p2=giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java&p1=giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java&r1=1375824&r2=1377179&rev=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraphVertex.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestMutateGraph.java Sat Aug 25 01:52:52 2012
@@ -18,24 +18,23 @@
package org.apache.giraph;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
import org.apache.giraph.examples.SimpleMutateGraphVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
import org.apache.giraph.graph.GiraphJob;
-import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
/**
* Unit test for graph mutation
*/
-public class TestMutateGraphVertex extends BspCase {
+public class TestMutateGraph extends BspCase {
- public TestMutateGraphVertex() {
- super(TestMutateGraphVertex.class.getName());
+ public TestMutateGraph() {
+ super(TestMutateGraph.class.getName());
}
/**
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Sat Aug 25 01:52:52 2012
@@ -18,26 +18,22 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Sets;
-import java.util.Set;
import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Sets;
+
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
+import java.util.Set;
/**
* Test the netty connections
@@ -57,7 +53,7 @@ public class ConnectionTest {
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -87,7 +83,7 @@ public class ConnectionTest {
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
@@ -132,7 +128,7 @@ public class ConnectionTest {
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Sat Aug 25 01:52:52 2012
@@ -18,13 +18,6 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
import org.apache.giraph.comm.messages.SimpleMessageStore;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
@@ -42,6 +35,14 @@ import static org.junit.Assert.assertEqu
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
/**
* Test all the netty failure scenarios
*/
@@ -136,7 +137,7 @@ public class RequestFailureTest {
// Start the service
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -176,7 +177,7 @@ public class RequestFailureTest {
// Start the service
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -216,7 +217,7 @@ public class RequestFailureTest {
// Start the service
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1377179&r1=1377178&r2=1377179&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Sat Aug 25 01:52:52 2012
@@ -24,6 +24,7 @@ import org.apache.giraph.graph.EdgeListV
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
@@ -96,7 +97,7 @@ public class RequestTest {
// Start the service
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
- (SimpleMessageStore.newFactory(
+ (conf, SimpleMessageStore.newFactory(
MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
server =
new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -135,19 +136,17 @@ public class RequestTest {
server.stop();
// Check the output
- Map<Integer, Collection<Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable>>> partitionVertexMap =
- serverData.getPartitionVertexMap();
- synchronized (partitionVertexMap) {
- assertTrue(partitionVertexMap.containsKey(partitionId));
- int total = 0;
- for (Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable> vertex :
- (partitionVertexMap.get(partitionId))) {
- total += vertex.getId().get();
- }
- assertEquals(total, 45);
+ PartitionStore<IntWritable, IntWritable,
+ IntWritable, IntWritable> partitionStore =
+ serverData.getPartitionStore();
+ assertTrue(partitionStore.hasPartition(partitionId));
+ int total = 0;
+ for (Vertex<IntWritable, IntWritable,
+ IntWritable, IntWritable> vertex :
+ partitionStore.getPartition(partitionId).getVertices()) {
+ total += vertex.getId().get();
}
+ assertEquals(total, 45);
}
@Test
Added: giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java?rev=1377179&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java (added)
+++ giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java Sat Aug 25 01:52:52 2012
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph.partition;
+
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.IntIntNullIntVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+
+/**
+ * Test case for partition stores.
+ */
+public class TestPartitionStores {
+ private static class MyVertex extends IntIntNullIntVertex {
+ @Override
+ public void compute(Iterable<IntWritable> messages) throws IOException {}
+ }
+
+ private Partition<IntWritable, IntWritable, NullWritable,
+ IntWritable> createPartition(Configuration conf, Integer id,
+ Vertex<IntWritable, IntWritable,
+ NullWritable, IntWritable>... vertices) {
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition =
+ new Partition<IntWritable, IntWritable, NullWritable,
+ IntWritable>(conf, id);
+ for (Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v :
+ vertices) {
+ partition.putVertex(v);
+ }
+ return partition;
+ }
+
+ @Test
+ public void testSimplePartitionStore() {
+ Configuration conf = new Configuration();
+ conf.setClass(GiraphJob.VERTEX_CLASS, MyVertex.class,
+ Vertex.class);
+ conf.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class,
+ WritableComparable.class);
+ conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, IntWritable.class,
+ Writable.class);
+ conf.setClass(GiraphJob.EDGE_VALUE_CLASS, NullWritable.class,
+ Writable.class);
+ conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class,
+ Writable.class);
+
+ PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+ partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
+ NullWritable, IntWritable>(conf);
+ testReadWrite(partitionStore, conf);
+ }
+
+ @Test
+ public void testDiskBackedPartitionStore() {
+ Configuration conf = new Configuration();
+ conf.setClass(GiraphJob.VERTEX_CLASS, MyVertex.class,
+ Vertex.class);
+ conf.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class,
+ WritableComparable.class);
+ conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, IntWritable.class,
+ Writable.class);
+ conf.setClass(GiraphJob.EDGE_VALUE_CLASS, NullWritable.class,
+ Writable.class);
+ conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class,
+ Writable.class);
+
+ conf.setBoolean(GiraphJob.USE_OUT_OF_CORE_GRAPH, true);
+ conf.setInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY, 1);
+
+ PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+ partitionStore = new DiskBackedPartitionStore<IntWritable,
+ IntWritable, NullWritable, IntWritable>(conf);
+ testReadWrite(partitionStore, conf);
+
+ conf.setInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY, 2);
+ partitionStore = new DiskBackedPartitionStore<IntWritable,
+ IntWritable, NullWritable, IntWritable>(conf);
+ testReadWrite(partitionStore, conf);
+ }
+
+ public void testReadWrite(PartitionStore<IntWritable, IntWritable,
+ NullWritable, IntWritable> partitionStore, Configuration conf) {
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v1 =
+ new MyVertex();
+ v1.initialize(new IntWritable(1), new IntWritable(1), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v2 =
+ new MyVertex();
+ v2.initialize(new IntWritable(2), new IntWritable(2), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v3 =
+ new MyVertex();
+ v3.initialize(new IntWritable(3), new IntWritable(3), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v4 =
+ new MyVertex();
+ v4.initialize(new IntWritable(4), new IntWritable(4), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v5 =
+ new MyVertex();
+ v5.initialize(new IntWritable(5), new IntWritable(5), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v6 =
+ new MyVertex();
+ v6.initialize(new IntWritable(7), new IntWritable(7), null, null);
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
+ new MyVertex();
+ v7.initialize(new IntWritable(7), new IntWritable(7), null, null);
+
+ partitionStore.addPartition(createPartition(conf, 1, v1, v2));
+ partitionStore.addPartition(createPartition(conf, 2, v3));
+ partitionStore.addPartitionVertices(2, Lists.newArrayList(v4));
+ partitionStore.addPartition(createPartition(conf, 3, v5));
+ partitionStore.addPartitionVertices(1, Lists.newArrayList(v6));
+ partitionStore.addPartitionVertices(4, Lists.newArrayList(v7));
+
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
+ partitionStore.getPartition(1);
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
+ partitionStore.getPartition(2);
+ Partition<IntWritable, IntWritable, NullWritable,
+ IntWritable> partition3 = partitionStore.removePartition(3);
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
+ partitionStore.getPartition(4);
+
+ assertEquals(3, partitionStore.getNumPartitions());
+ assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
+ assertEquals(3, Iterables.size(partitionStore.getPartitions()));
+ assertTrue(partitionStore.hasPartition(1));
+ assertTrue(partitionStore.hasPartition(2));
+ assertFalse(partitionStore.hasPartition(3));
+ assertTrue(partitionStore.hasPartition(4));
+ assertEquals(3, partition1.getVertices().size());
+ assertEquals(2, partition2.getVertices().size());
+ assertEquals(1, partition3.getVertices().size());
+ assertEquals(1, partition4.getVertices().size());
+
+ partitionStore.deletePartition(2);
+
+ assertEquals(2, partitionStore.getNumPartitions());
+ }
+}