You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/05/20 22:14:26 UTC
[2/5] git commit: updated refs/heads/trunk to 6256a76
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 6c08dfd..784d578 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -20,6 +20,7 @@ package org.apache.giraph.ooc.data;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.giraph.bsp.BspService;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.log4j.Logger;
@@ -27,7 +28,6 @@ import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -59,11 +59,11 @@ public class MetaPartitionManager {
/** Number of in-memory partitions */
private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
- /** Map of partitions to their meta information */
+ /** Map (dictionary) of partitions to their meta information */
private final ConcurrentMap<Integer, MetaPartition> partitions =
Maps.newConcurrentMap();
- /** List of partitions assigned to each IO threads */
- private final List<PerThreadPartitionStatus> perThreadPartitions;
+ /** Reverse dictionaries of partitions assigned to each IO thread */
+ private final List<MetaPartitionDictionary> perThreadPartitionDictionary;
/** For each IO thread, set of partition ids that are on-disk and have
* 'large enough' vertex/edge buffers to be offloaded on disk
*/
@@ -95,11 +95,11 @@ public class MetaPartitionManager {
* @param oocEngine out-of-core engine
*/
public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
- perThreadPartitions = new ArrayList<>(numIOThreads);
+ perThreadPartitionDictionary = new ArrayList<>(numIOThreads);
perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
perThreadMessageBuffers = new ArrayList<>(numIOThreads);
for (int i = 0; i < numIOThreads; ++i) {
- perThreadPartitions.add(new PerThreadPartitionStatus());
+ perThreadPartitionDictionary.add(new MetaPartitionDictionary());
perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
}
@@ -156,11 +156,7 @@ public class MetaPartitionManager {
if (temp == null) {
int ownerThread = oocEngine.getIOScheduler()
.getOwnerThreadId(partitionId);
- Set<MetaPartition> partitionSet =
- perThreadPartitions.get(ownerThread).getInMemoryProcessed();
- synchronized (partitionSet) {
- partitionSet.add(meta);
- }
+ perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
numInMemoryPartitions.getAndIncrement();
}
}
@@ -173,6 +169,8 @@ public class MetaPartitionManager {
*/
public void removePartition(Integer partitionId) {
MetaPartition meta = partitions.remove(partitionId);
+ int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
checkState(!meta.isOnDisk());
numInMemoryPartitions.getAndDecrement();
}
@@ -216,20 +214,46 @@ public class MetaPartitionManager {
* @return id of the partition to offload on disk
*/
public Integer getOffloadPartitionId(int threadId) {
- Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
- .getInMemoryProcessed();
- synchronized (partitionSet) {
- MetaPartition meta = peekFromSet(partitionSet);
- if (meta != null) {
- return meta.getPartitionId();
- }
- }
- partitionSet = perThreadPartitions.get(threadId).getInMemoryUnprocessed();
- synchronized (partitionSet) {
- MetaPartition meta = peekFromSet(partitionSet);
- if (meta != null) {
- return meta.getPartitionId();
- }
+ MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.IN_MEM,
+ StorageState.IN_MEM,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.IN_MEM,
+ null,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ null,
+ StorageState.IN_MEM,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.IN_MEM,
+ null,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ null,
+ StorageState.IN_MEM,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
}
return null;
}
@@ -241,8 +265,7 @@ public class MetaPartitionManager {
* @return id of the partition to offload its vertex/edge buffers on disk
*/
public Integer getOffloadPartitionBufferId(int threadId) {
- if (oocEngine.getServiceWorker().getSuperstep() ==
- BspServiceWorker.INPUT_SUPERSTEP) {
+ if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
Integer partitionId =
popFromSet(perThreadVertexEdgeBuffers.get(threadId));
if (partitionId == null) {
@@ -270,8 +293,7 @@ public class MetaPartitionManager {
* @return id of the partition to offload its message buffer on disk
*/
public Integer getOffloadMessageBufferId(int threadId) {
- if (oocEngine.getServiceWorker().getSuperstep() !=
- BspServiceWorker.INPUT_SUPERSTEP) {
+ if (oocEngine.getSuperstep() != BspServiceWorker.INPUT_SUPERSTEP) {
Integer partitionId =
popFromSet(perThreadMessageBuffers.get(threadId));
if (partitionId == null) {
@@ -297,53 +319,115 @@ public class MetaPartitionManager {
* @return id of the partition to offload its message on disk
*/
public Integer getOffloadMessageId(int threadId) {
- Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
- .getInDiskProcessed();
- synchronized (partitionSet) {
- for (MetaPartition meta : partitionSet) {
- if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
- return meta.getPartitionId();
- }
- }
- }
- partitionSet = perThreadPartitions.get(threadId).getInDiskUnprocessed();
- synchronized (partitionSet) {
- for (MetaPartition meta : partitionSet) {
- if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
- return meta.getPartitionId();
- }
- }
+ if (oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+ return null;
+ }
+ MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.ON_DISK,
+ null,
+ StorageState.IN_MEM);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.ON_DISK,
+ null,
+ StorageState.IN_MEM);
+ if (meta != null) {
+ return meta.getPartitionId();
}
return null;
}
/**
- * Get id of a partition to prefetch its data to memory
+ * Get id of a partition to load its data to memory
*
* @param threadId id of the thread who is going to load the partition data
* @return id of the partition to load its data to memory
*/
- public Integer getPrefetchPartitionId(int threadId) {
- Set<MetaPartition> partitionSet =
- perThreadPartitions.get(threadId).getInDiskUnprocessed();
- synchronized (partitionSet) {
- MetaPartition meta = peekFromSet(partitionSet);
- return (meta != null) ? meta.getPartitionId() : null;
+ public Integer getLoadPartitionId(int threadId) {
+ MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.IN_MEM,
+ StorageState.ON_DISK,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.ON_DISK,
+ null,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.ON_DISK,
+ null,
+ null);
+ if (meta != null) {
+ meta.getPartitionId();
+ }
+
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ null,
+ StorageState.ON_DISK,
+ null);
+ if (meta != null) {
+ meta.getPartitionId();
}
+ return null;
}
/**
- * Mark a partition inaccessible to IO and compute threads
+ * Mark a partition as being 'IN_PROCESS'
*
* @param partitionId id of the partition to mark
*/
- public void makePartitionInaccessible(int partitionId) {
+ public void markPartitionAsInProcess(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- perThreadPartitions.get(oocEngine.getIOScheduler()
- .getOwnerThreadId(partitionId))
- .remove(meta);
+ int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
+ perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setProcessingState(ProcessingState.IN_PROCESS);
+ perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
+ }
+ }
+
+ /**
+ * Whether there is any processed partition stored in memory (excluding those
+ * that are prefetched to execute in the next superstep).
+ *
+ * @return true iff there is any processed partition in memory
+ */
+ public boolean hasProcessedOnMemory() {
+ for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
+ if (dictionary.hasProcessedOnMemory()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Whether a partition is *processed* in the current iteration cycle over
+ * partitions.
+ *
+ * @param partitionId id of the partition to check
+ * @return true iff processing the given partition is done
+ */
+ public boolean isPartitionProcessed(Integer partitionId) {
+ MetaPartition meta = partitions.get(partitionId);
+ synchronized (meta) {
+ return meta.getProcessingState() == ProcessingState.PROCESSED;
}
}
@@ -354,14 +438,11 @@ public class MetaPartitionManager {
*/
public void setPartitionIsProcessed(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
+ int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
+ perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setProcessingState(ProcessingState.PROCESSED);
- }
- Set<MetaPartition> partitionSet = perThreadPartitions
- .get(oocEngine.getIOScheduler().getOwnerThreadId(partitionId))
- .getInMemoryProcessed();
- synchronized (partitionSet) {
- partitionSet.add(meta);
+ perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
}
numPartitionsProcessed.getAndIncrement();
}
@@ -378,7 +459,7 @@ public class MetaPartitionManager {
MetaPartition meta = partitions.get(partitionId);
synchronized (meta) {
boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
- if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
+ if (superstep == oocEngine.getSuperstep()) {
shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
} else {
shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
@@ -391,25 +472,27 @@ public class MetaPartitionManager {
* Notify this meta store that load of a partition for a specific superstep
* is completed
*
- * @param partitionId id of a the partition that load is completed
+ * @param partitionId id of the partition for which the load is completed
* @param superstep superstep in which the partition is loaded for
*/
public void doneLoadingPartition(int partitionId, long superstep) {
MetaPartition meta = partitions.get(partitionId);
numInMemoryPartitions.getAndIncrement();
int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
- boolean removed = perThreadPartitions.get(owner)
- .remove(meta, StorageState.ON_DISK);
- if (removed || meta.getProcessingState() == ProcessingState.IN_PROCESS) {
- synchronized (meta) {
- meta.setPartitionState(StorageState.IN_MEM);
- if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
- meta.setCurrentMessagesState(StorageState.IN_MEM);
- } else {
- meta.setIncomingMessagesState(StorageState.IN_MEM);
- }
+ synchronized (meta) {
+ perThreadPartitionDictionary.get(owner).removePartition(meta);
+ meta.setPartitionState(StorageState.IN_MEM);
+ if (superstep == oocEngine.getSuperstep()) {
+ meta.setCurrentMessagesState(StorageState.IN_MEM);
+ } else {
+ meta.setIncomingMessagesState(StorageState.IN_MEM);
+ }
+ // Check whether load was to prefetch a partition from disk to memory for
+ // the next superstep
+ if (meta.getProcessingState() == ProcessingState.PROCESSED) {
+ perThreadPartitionDictionary.get(owner).increaseNumPrefetch();
}
- perThreadPartitions.get(owner).add(meta, StorageState.IN_MEM);
+ perThreadPartitionDictionary.get(owner).addPartition(meta);
}
}
@@ -422,9 +505,13 @@ public class MetaPartitionManager {
*/
public boolean startOffloadingMessages(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
+ int ownerThread =
+ oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
+ perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
+ perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
return true;
} else {
return false;
@@ -441,8 +528,12 @@ public class MetaPartitionManager {
*/
public void doneOffloadingMessages(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
+ int ownerThread =
+ oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
synchronized (meta) {
+ perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setIncomingMessagesState(StorageState.ON_DISK);
+ perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
}
}
@@ -478,17 +569,18 @@ public class MetaPartitionManager {
public boolean startOffloadingPartition(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
- boolean removed = perThreadPartitions.get(owner)
- .remove(meta, StorageState.IN_MEM);
- if (removed) {
- synchronized (meta) {
+ synchronized (meta) {
+ if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
+ (meta.getPartitionState() == StorageState.IN_MEM ||
+ meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
+ perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.setPartitionState(StorageState.IN_TRANSIT);
meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+ perThreadPartitionDictionary.get(owner).addPartition(meta);
+ return true;
+ } else {
+ return false;
}
- perThreadPartitions.get(owner).add(meta, StorageState.IN_TRANSIT);
- return true;
- } else {
- return false;
}
}
@@ -502,14 +594,11 @@ public class MetaPartitionManager {
numInMemoryPartitions.getAndDecrement();
MetaPartition meta = partitions.get(partitionId);
int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
- boolean removed = perThreadPartitions.get(owner)
- .remove(meta, StorageState.IN_TRANSIT);
- if (removed) {
- synchronized (meta) {
- meta.setPartitionState(StorageState.ON_DISK);
- meta.setCurrentMessagesState(StorageState.ON_DISK);
- }
- perThreadPartitions.get(owner).add(meta, StorageState.ON_DISK);
+ synchronized (meta) {
+ perThreadPartitionDictionary.get(owner).removePartition(meta);
+ meta.setPartitionState(StorageState.ON_DISK);
+ meta.setCurrentMessagesState(StorageState.ON_DISK);
+ perThreadPartitionDictionary.get(owner).addPartition(meta);
}
}
@@ -517,15 +606,17 @@ public class MetaPartitionManager {
* Reset the meta store for a new iteration cycle over all partitions.
* Note: this is not thread-safe and should be called from a single thread.
*/
- public void resetPartition() {
+ public void resetPartitions() {
for (MetaPartition meta : partitions.values()) {
+ int owner =
+ oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+ perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.resetPartition();
+ perThreadPartitionDictionary.get(owner).addPartition(meta);
}
- int numPartition = 0;
- for (PerThreadPartitionStatus status : perThreadPartitions) {
- numPartition += status.reset();
+ for (MetaPartitionDictionary dictionary : perThreadPartitionDictionary) {
+ dictionary.reset();
}
- checkState(numPartition == partitions.size());
numPartitionsProcessed.set(0);
}
@@ -535,41 +626,22 @@ public class MetaPartitionManager {
*/
public void resetMessages() {
for (MetaPartition meta : partitions.values()) {
+ int owner =
+ oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+ perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.resetMessages();
- }
- // After swapping incoming messages and current messages, it may be the case
- // that a partition has data in memory (partitionState == IN_MEM), but now
- // its current messages are on disk (currentMessageState == ON_DISK). So, we
- // have to mark the partition as ON_DISK, and load its messages once it is
- // about to be processed.
- for (PerThreadPartitionStatus status : perThreadPartitions) {
- Set<MetaPartition> partitionSet = status.getInMemoryUnprocessed();
- Iterator<MetaPartition> it = partitionSet.iterator();
- while (it.hasNext()) {
- MetaPartition meta = it.next();
- if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
- it.remove();
- status.getInDiskUnprocessed().add(meta);
- numInMemoryPartitions.getAndDecrement();
- }
- }
- partitionSet = status.getInMemoryProcessed();
- it = partitionSet.iterator();
- while (it.hasNext()) {
- MetaPartition meta = it.next();
- if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
- it.remove();
- status.getInDiskProcessed().add(meta);
- numInMemoryPartitions.getAndDecrement();
- }
+ if (meta.getPartitionState() == StorageState.IN_MEM &&
+ meta.getCurrentMessagesState() == StorageState.ON_DISK) {
+ numInMemoryPartitions.getAndDecrement();
}
+ perThreadPartitionDictionary.get(owner).addPartition(meta);
}
}
/**
* Return the id of an unprocessed partition in memory. If all partitions are
* processed, return an appropriate 'finisher signal'. If there are
- * unprocessed partitions, but none are is memory, return null.
+ * unprocessed partitions, but none are in memory, return null.
*
* @return id of the partition to be processed next.
*/
@@ -577,20 +649,40 @@ public class MetaPartitionManager {
if (numPartitionsProcessed.get() >= partitions.size()) {
return NO_PARTITION_TO_PROCESS;
}
- int numThreads = perThreadPartitions.size();
+ int numThreads = perThreadPartitionDictionary.size();
int index = randomGenerator.nextInt(numThreads);
int startIndex = index;
+ MetaPartition meta;
do {
- Set<MetaPartition> partitionSet =
- perThreadPartitions.get(index).getInMemoryUnprocessed();
- MetaPartition meta;
- synchronized (partitionSet) {
- meta = popFromSet(partitionSet);
- }
- if (meta != null) {
- synchronized (meta) {
- meta.setProcessingState(ProcessingState.IN_PROCESS);
- return meta.getPartitionId();
+ // We first look up a partition in the reverse dictionary. If there is a
+ // partition with the given properties, we then check whether we can
+ // return it as the next partition to process. If we cannot, there may
+ // still be other partitions in the dictionary, so we will continue
+ // looping through all of them. If all the partitions with our desired
+ // properties has been examined, we will break the loop.
+ while (true) {
+ meta = perThreadPartitionDictionary.get(index).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.IN_MEM,
+ StorageState.IN_MEM,
+ null);
+ if (meta != null) {
+ // Here we should check if the 'meta' still has the same property as
+ // when it was looked up in the dictionary. There may be a case where
+ // meta changes from the time it is looked up until the moment the
+ // synchronize block is granted to progress.
+ synchronized (meta) {
+ if (meta.getProcessingState() == ProcessingState.UNPROCESSED &&
+ meta.getPartitionState() == StorageState.IN_MEM &&
+ meta.getCurrentMessagesState() == StorageState.IN_MEM) {
+ perThreadPartitionDictionary.get(index).removePartition(meta);
+ meta.setProcessingState(ProcessingState.IN_PROCESS);
+ perThreadPartitionDictionary.get(index).addPartition(meta);
+ return meta.getPartitionId();
+ }
+ }
+ } else {
+ break;
}
}
index = (index + 1) % numThreads;
@@ -607,7 +699,9 @@ public class MetaPartitionManager {
*/
public boolean isPartitionOnDisk(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- return meta.isOnDisk();
+ synchronized (meta) {
+ return meta.isOnDisk();
+ }
}
/**
@@ -623,18 +717,19 @@ public class MetaPartitionManager {
/** Storage state of partition data */
private StorageState partitionState;
/** Processing state of a partition */
- private volatile ProcessingState processingState;
+ private ProcessingState processingState;
/**
* Constructor
*
* @param partitionId id of the partition
*/
- MetaPartition(int partitionId) {
+ public MetaPartition(int partitionId) {
this.partitionId = partitionId;
- this.processingState = ProcessingState.PROCESSED;
+ this.processingState = ProcessingState.UNPROCESSED;
this.partitionState = StorageState.IN_MEM;
this.currentMessagesState = StorageState.IN_MEM;
+ this.incomingMessagesState = StorageState.IN_MEM;
}
@Override
@@ -649,65 +744,30 @@ public class MetaPartitionManager {
return sb.toString();
}
- /**
- * Get id of the partition
- *
- * @return id of the partition
- */
public int getPartitionId() {
return partitionId;
}
- /**
- * Get storage state of incoming messages of the partition
- *
- * @return storage state of incoming messages
- */
public StorageState getIncomingMessagesState() {
return incomingMessagesState;
}
- /**
- * Set storage state of incoming messages of the partition
- *
- * @param incomingMessagesState storage state of incoming messages
- */
public void setIncomingMessagesState(StorageState incomingMessagesState) {
this.incomingMessagesState = incomingMessagesState;
}
- /**
- * Get storage state of current messages of the partition
- *
- * @return storage state of current messages
- */
public StorageState getCurrentMessagesState() {
return currentMessagesState;
}
- /**
- * Set storage state of current messages of the partition
- *
- * @param currentMessagesState storage state of current messages
- */
public void setCurrentMessagesState(StorageState currentMessagesState) {
this.currentMessagesState = currentMessagesState;
}
- /**
- * Get storage state of the partition
- *
- * @return storage state of the partition
- */
public StorageState getPartitionState() {
return partitionState;
}
- /**
- * Set storage state of the partition
- *
- * @param state storage state of the partition
- */
public void setPartitionState(StorageState state) {
this.partitionState = state;
}
@@ -748,200 +808,167 @@ public class MetaPartitionManager {
}
/**
- * Representation of partitions' state per IO thread
+ * Class representing reverse dictionary for partitions. The main operation
+ * of the reverse dictionary is to lookup for a partition with certain
+ * properties. The responsibility of keeping the dictionary consistent
+ * when partition property changes in on the code that changes the property.
+ * One can simply remove a partition from the dictionary, change the property
+ * (or properties), and then add the partition to the dictionary.
*/
- private static class PerThreadPartitionStatus {
+ private static class MetaPartitionDictionary {
/**
- * Contains partitions that has been processed in the current iteration
- * cycle, and are not in use by any thread.
+ * Sets of partitions for each possible combination of properties. Each
+ * partition can have 4 properties, and each property can have any of 3
+ * different values. The properties are as follows (in the order in which
+ * it is used as the dimensions of the following 4-D array):
+ * - processing status (PROCESSED, UN_PROCESSED, or IN_PROCESS)
+ * - partition storage status (IN_MEM, IN_TRANSIT, ON_DISK)
+ * - current messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
+ * - incoming messages storage status (IN_MEM, IN_TRANSIT, ON_DISK)
*/
- private Map<StorageState, Set<MetaPartition>>
- processedPartitions = Maps.newConcurrentMap();
+ private final Set<MetaPartition>[][][][] partitions =
+ (Set<MetaPartition>[][][][]) new Set<?>[3][3][3][3];
/**
- * Contains partitions that has *NOT* been processed in the current
- * iteration cycle, and are not in use by any thread.
+ * Number of partitions that has been prefetched to be computed in the
+ * next superstep
*/
- private Map<StorageState, Set<MetaPartition>>
- unprocessedPartitions = Maps.newConcurrentMap();
+ private final AtomicInteger numPrefetch = new AtomicInteger(0);
/**
* Constructor
*/
- public PerThreadPartitionStatus() {
- processedPartitions.put(StorageState.IN_MEM,
- Sets.<MetaPartition>newLinkedHashSet());
- processedPartitions.put(StorageState.ON_DISK,
- Sets.<MetaPartition>newLinkedHashSet());
- processedPartitions.put(StorageState.IN_TRANSIT,
- Sets.<MetaPartition>newLinkedHashSet());
-
- unprocessedPartitions.put(StorageState.IN_MEM,
- Sets.<MetaPartition>newLinkedHashSet());
- unprocessedPartitions.put(StorageState.ON_DISK,
- Sets.<MetaPartition>newLinkedHashSet());
- unprocessedPartitions.put(StorageState.IN_TRANSIT,
- Sets.<MetaPartition>newLinkedHashSet());
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("\nProcessed Partitions: " + processedPartitions + "; ");
- sb.append("\nUnprocessedPartitions: " + unprocessedPartitions);
- return sb.toString();
- }
-
- /**
- * Get set of partitions that are in memory and are processed
- *
- * @return set of partition that are in memory are are processed
- */
- public Set<MetaPartition> getInMemoryProcessed() {
- return processedPartitions.get(StorageState.IN_MEM);
- }
-
- /**
- * Get set of partitions that are in memory are are not processed
- *
- * @return set of partitions that are in memory and are not processed
- */
- public Set<MetaPartition> getInMemoryUnprocessed() {
- return unprocessedPartitions.get(StorageState.IN_MEM);
+ public MetaPartitionDictionary() {
+ for (int i = 0; i < 3; ++i) {
+ for (int j = 0; j < 3; ++j) {
+ for (int k = 0; k < 3; ++k) {
+ for (int t = 0; t < 3; ++t) {
+ partitions[i][j][k][t] = Sets.newLinkedHashSet();
+ }
+ }
+ }
+ }
}
/**
- * Get set of partitions that are on disk and are processed
+ * Get a partition set associated with property combination that a given
+ * partition has
*
- * @return set of partitions that are on disk and are processed
+ * @param meta meta partition containing properties of a partition
+ * @return partition set with the same property combination as the given
+ * meta partition
*/
- public Set<MetaPartition> getInDiskProcessed() {
- return processedPartitions.get(StorageState.ON_DISK);
+ private Set<MetaPartition> getSet(MetaPartition meta) {
+ return partitions[meta.getProcessingState().ordinal()]
+ [meta.getPartitionState().ordinal()]
+ [meta.getCurrentMessagesState().ordinal()]
+ [meta.getIncomingMessagesState().ordinal()];
}
/**
- * Get set of partitions that are on disk and are not processed
+ * Add a partition to the dictionary
*
- * @return set of partitions that are on disk and are not processed
+ * @param meta meta information of the partition to add
*/
- public Set<MetaPartition> getInDiskUnprocessed() {
- return unprocessedPartitions.get(StorageState.ON_DISK);
+ public void addPartition(MetaPartition meta) {
+ Set<MetaPartition> partitionSet = getSet(meta);
+ synchronized (partitionSet) {
+ partitionSet.add(meta);
+ }
}
/**
- * Remove a partition from meta information
+ * Remove a partition to the dictionary
*
- * @param meta meta-information of a partition to be removed
+ * @param meta meta infomation of the partition to remove
*/
- public void remove(MetaPartition meta) {
- Set<MetaPartition> partitionSet;
- partitionSet = processedPartitions.get(StorageState.IN_MEM);
- synchronized (partitionSet) {
- if (partitionSet.remove(meta)) {
- return;
- }
- }
- partitionSet = unprocessedPartitions.get(StorageState.IN_MEM);
- synchronized (partitionSet) {
- if (partitionSet.remove(meta)) {
- return;
- }
- }
- partitionSet = processedPartitions.get(StorageState.IN_TRANSIT);
- synchronized (partitionSet) {
- if (partitionSet.remove(meta)) {
- return;
- }
- }
- partitionSet = unprocessedPartitions.get(StorageState.IN_TRANSIT);
- synchronized (partitionSet) {
- if (partitionSet.remove(meta)) {
- return;
- }
- }
- partitionSet = processedPartitions.get(StorageState.ON_DISK);
- synchronized (partitionSet) {
- if (partitionSet.remove(meta)) {
- return;
- }
- }
- partitionSet = unprocessedPartitions.get(StorageState.ON_DISK);
+ public void removePartition(MetaPartition meta) {
+ Set<MetaPartition> partitionSet = getSet(meta);
synchronized (partitionSet) {
partitionSet.remove(meta);
}
}
/**
- * Reset meta-information for the next iteration cycle over all partitions
+ * Lookup for a partition with given properties. One can use wildcard as
+ * a property in lookup operation (by passing null as the property).
*
- * @return total number of partitions kept for this thread
+ * @param processingState processing state property
+ * @param partitionStorageState partition storage property
+ * @param currentMessagesState current messages storage property
+ * @param incomingMessagesState incoming messages storage property
+ * @return a meta partition in the dictionary with the given combination of
+ * properties. If there is no such partition, return null
*/
- public int reset() {
- checkState(unprocessedPartitions.get(StorageState.IN_MEM).size() == 0);
- checkState(unprocessedPartitions.get(StorageState.IN_TRANSIT).size() ==
- 0);
- checkState(unprocessedPartitions.get(StorageState.ON_DISK).size() == 0);
- unprocessedPartitions.clear();
- unprocessedPartitions.putAll(processedPartitions);
- processedPartitions.clear();
- processedPartitions.put(StorageState.IN_MEM,
- Sets.<MetaPartition>newLinkedHashSet());
- processedPartitions.put(StorageState.ON_DISK,
- Sets.<MetaPartition>newLinkedHashSet());
- processedPartitions.put(StorageState.IN_TRANSIT,
- Sets.<MetaPartition>newLinkedHashSet());
- return unprocessedPartitions.get(StorageState.IN_MEM).size() +
- unprocessedPartitions.get(StorageState.IN_TRANSIT).size() +
- unprocessedPartitions.get(StorageState.ON_DISK).size();
+ public MetaPartition lookup(ProcessingState processingState,
+ StorageState partitionStorageState,
+ StorageState currentMessagesState,
+ StorageState incomingMessagesState) {
+ int iStart =
+ (processingState == null) ? 0 : processingState.ordinal();
+ int iEnd =
+ (processingState == null) ? 3 : (processingState.ordinal() + 1);
+ int jStart =
+ (partitionStorageState == null) ? 0 : partitionStorageState.ordinal();
+ int jEnd = (partitionStorageState == null) ? 3 :
+ (partitionStorageState.ordinal() + 1);
+ int kStart =
+ (currentMessagesState == null) ? 0 : currentMessagesState.ordinal();
+ int kEnd = (currentMessagesState == null) ? 3 :
+ (currentMessagesState.ordinal() + 1);
+ int tStart =
+ (incomingMessagesState == null) ? 0 : incomingMessagesState.ordinal();
+ int tEnd = (incomingMessagesState == null) ? 3 :
+ (incomingMessagesState.ordinal() + 1);
+ for (int i = iStart; i < iEnd; ++i) {
+ for (int j = jStart; j < jEnd; ++j) {
+ for (int k = kStart; k < kEnd; ++k) {
+ for (int t = tStart; t < tEnd; ++t) {
+ Set<MetaPartition> partitionSet = partitions[i][j][k][t];
+ synchronized (partitionSet) {
+ MetaPartition meta = peekFromSet(partitionSet);
+ if (meta != null) {
+ return meta;
+ }
+ }
+ }
+ }
+ }
+ }
+ return null;
}
/**
- * Remove a partition from partition set of a given state
+ * Whether there is an in-memory partition that is processed already,
+ * excluding those partitions that are prefetched
*
- * @param meta meta partition to remove
- * @param state state from which the partition should be removed
- * @return true iff the partition is actually removed
+ * @return true if there is a processed in-memory partition
*/
- public boolean remove(MetaPartition meta, StorageState state) {
- boolean removed = false;
- Set<MetaPartition> partitionSet = null;
- if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
- partitionSet = unprocessedPartitions.get(state);
- } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
- partitionSet = processedPartitions.get(state);
- } else {
- LOG.info("remove: partition " + meta.getPartitionId() + " is " +
- "already being processed! This should happen only if partition " +
- "removal is done before start of an iteration over all partitions");
- }
- if (partitionSet != null) {
- synchronized (partitionSet) {
- removed = partitionSet.remove(meta);
+ public boolean hasProcessedOnMemory() {
+ int count = 0;
+ for (int i = 0; i < 3; ++i) {
+ for (int j = 0; j < 3; ++j) {
+ Set<MetaPartition> partitionSet =
+ partitions[ProcessingState.PROCESSED.ordinal()]
+ [StorageState.IN_MEM.ordinal()][i][j];
+ synchronized (partitionSet) {
+ count += partitionSet.size();
+ }
}
}
- return removed;
+ return count - numPrefetch.get() != 0;
+ }
+
+ /** Increase number of prefetch-ed partition by 1 */
+ public void increaseNumPrefetch() {
+ numPrefetch.getAndIncrement();
}
/**
- * Add a partition to partition set of a given state
- *
- * @param meta meta partition to add
- * @param state state to which the partition should be added
+ * Reset the dictionary preparing it for the next iteration cycle over
+ * partitions
*/
- public void add(MetaPartition meta, StorageState state) {
- Set<MetaPartition> partitionSet = null;
- if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
- partitionSet = unprocessedPartitions.get(state);
- } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
- partitionSet = processedPartitions.get(state);
- } else {
- LOG.info("add: partition " + meta.getPartitionId() + " is already " +
- "being processed!");
- }
- if (partitionSet != null) {
- synchronized (partitionSet) {
- partitionSet.add(meta);
- }
- }
+ public void reset() {
+ numPrefetch.set(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
index 7d97e51..325850c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
@@ -181,18 +181,22 @@ public abstract class OutOfCoreDataManager<T> {
/**
* Loads and assembles all data for a given partition, and put it into the
- * data store.
+ * data store. Returns the number of bytes transferred from disk to memory in
+ * the loading process.
*
* @param partitionId id of the partition to load ana assemble all data for
* @param basePath path to load the data from
+ * @return number of bytes loaded from disk to memory
* @throws IOException
*/
- public void loadPartitionData(int partitionId, String basePath)
+ public long loadPartitionData(int partitionId, String basePath)
throws IOException {
+ long numBytes = 0;
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.writeLock().lock();
if (hasPartitionDataOnDisk.contains(partitionId)) {
- loadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+ numBytes += loadInMemoryPartitionData(partitionId,
+ getPath(basePath, partitionId));
hasPartitionDataOnDisk.remove(partitionId);
// Loading raw data buffers from disk if there is any and applying those
// to already loaded in-memory data.
@@ -213,6 +217,7 @@ public abstract class OutOfCoreDataManager<T> {
addEntryToImMemoryPartitionData(partitionId, entry);
}
dis.close();
+ numBytes += file.length();
checkState(file.delete(), "loadPartitionData: failed to delete %s.",
file.getAbsoluteFile());
}
@@ -225,38 +230,44 @@ public abstract class OutOfCoreDataManager<T> {
}
}
rwLock.writeLock().unlock();
+ return numBytes;
}
/**
- * Offloads partition data of a given partition in the data store to disk
+ * Offloads partition data of a given partition in the data store to disk, and
+ * returns the number of bytes offloaded from memory to disk.
*
* @param partitionId id of the partition to offload its data
* @param basePath path to offload the data to
+ * @return number of bytes offloaded from memory to disk
* @throws IOException
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
"UL_UNRELEASED_LOCK_EXCEPTION_PATH")
- public void offloadPartitionData(int partitionId, String basePath)
+ public long offloadPartitionData(int partitionId, String basePath)
throws IOException {
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.writeLock().lock();
hasPartitionDataOnDisk.add(partitionId);
rwLock.writeLock().unlock();
- offloadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+ return offloadInMemoryPartitionData(partitionId,
+ getPath(basePath, partitionId));
}
/**
- * Offloads raw data buffers of a given partition to disk
+ * Offloads raw data buffers of a given partition to disk, and returns the
+ * number of bytes offloaded from memory to disk.
*
* @param partitionId id of the partition to offload its raw data buffers
* @param basePath path to offload the data to
+ * @return number of bytes offloaded from memory to disk
* @throws IOException
*/
- public void offloadBuffers(int partitionId, String basePath)
+ public long offloadBuffers(int partitionId, String basePath)
throws IOException {
Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
- return;
+ return 0;
}
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.writeLock().lock();
@@ -272,6 +283,7 @@ public abstract class OutOfCoreDataManager<T> {
writeEntry(entry, dos);
}
dos.close();
+ long numBytes = dos.size();
int numBuffers = pair.getRight().size();
Integer oldNumBuffersOnDisk =
numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
@@ -279,6 +291,7 @@ public abstract class OutOfCoreDataManager<T> {
numDataBuffersOnDisk.replace(partitionId,
oldNumBuffersOnDisk + numBuffers);
}
+ return numBytes;
}
/**
@@ -345,24 +358,27 @@ public abstract class OutOfCoreDataManager<T> {
protected abstract T readNextEntry(DataInput in) throws IOException;
/**
- * Loads data of a partition into data store.
+ * Loads data of a partition into data store. Returns number of bytes loaded.
*
* @param partitionId id of the partition to load its data
* @param path path from which data should be loaded
+ * @return number of bytes loaded from disk to memory
* @throws IOException
*/
- protected abstract void loadInMemoryPartitionData(int partitionId,
+ protected abstract long loadInMemoryPartitionData(int partitionId,
String path)
throws IOException;
/**
- * Offloads data of a partition in data store to disk.
+ * Offloads data of a partition in data store to disk. Returns the number of
+ * bytes offloaded to disk
*
* @param partitionId id of the partition to offload to disk
* @param path path to which data should be offloaded
+ * @return number of bytes offloaded from memory to disk
* @throws IOException
*/
- protected abstract void offloadInMemoryPartitionData(int partitionId,
+ protected abstract long offloadInMemoryPartitionData(int partitionId,
String path)
throws IOException;
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
index eb6d2c9..e84ad29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
@@ -27,10 +27,32 @@ import java.io.IOException;
* out-of-core mechanism.
*/
public abstract class IOCommand {
+ /** Type of IO command */
+ public enum IOCommandType {
+ /** Loading a partition */
+ LOAD_PARTITION,
+ /** Storing a partition */
+ STORE_PARTITION,
+ /** Storing incoming messages of a partition */
+ STORE_MESSAGE,
+ /**
+ * Storing message/buffer raw data buffer of a currently out-of-core
+ * partition
+ */
+ STORE_BUFFER,
+ /** Doing nothing regarding IO */
+ WAIT
+ }
+
/** Id of the partition involved for the IO */
protected final int partitionId;
/** Out-of-core engine */
protected final OutOfCoreEngine oocEngine;
+ /**
+ * Number of bytes transferred to/from memory (loaded/stored) during the
+ * execution of the command
+ */
+ protected long numBytesTransferred;
/**
* Constructor
@@ -41,10 +63,11 @@ public abstract class IOCommand {
public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
this.oocEngine = oocEngine;
this.partitionId = partitionId;
+ this.numBytesTransferred = 0;
}
/**
- * GEt the id of the partition involved in the IO
+ * Get the id of the partition involved in the IO
*
* @return id of the partition
*/
@@ -54,12 +77,30 @@ public abstract class IOCommand {
/**
* Execute (load/store of data) the IO command, and change the data stores
- * appropriately based on the data loaded/stored.
+ * appropriately based on the data loaded/stored. Return true iff the command
+ * is actually executed (resulted in loading or storing data).
*
* @param basePath the base path (prefix) to the files/folders IO command
* should read/write data from/to
+ * @return whether the command is actually executed
* @throws IOException
*/
- public abstract void execute(String basePath) throws IOException;
+ public abstract boolean execute(String basePath) throws IOException;
+
+ /**
+ * Get the type of the command.
+ *
+ * @return type of the command
+ */
+ public abstract IOCommandType getType();
+
+ /**
+ * Get the number of bytes transferred (loaded/stored from/to disk).
+ *
+ * @return number of bytes transferred during the execution of the command
+ */
+ public long bytesTransferred() {
+ return numBytesTransferred;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
index c28a0da..ce24fe2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
@@ -54,19 +54,22 @@ public class LoadPartitionIOCommand extends IOCommand {
}
@Override
- public void execute(String basePath) throws IOException {
+ public boolean execute(String basePath) throws IOException {
+ boolean executed = false;
if (oocEngine.getMetaPartitionManager()
.startLoadingPartition(partitionId, superstep)) {
- long currentSuperstep = oocEngine.getServiceWorker().getSuperstep();
+ long currentSuperstep = oocEngine.getSuperstep();
DiskBackedPartitionStore partitionStore =
(DiskBackedPartitionStore)
oocEngine.getServerData().getPartitionStore();
- partitionStore.loadPartitionData(partitionId, basePath);
+ numBytesTransferred +=
+ partitionStore.loadPartitionData(partitionId, basePath);
if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
superstep == currentSuperstep) {
DiskBackedEdgeStore edgeStore =
(DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
- edgeStore.loadPartitionData(partitionId, basePath);
+ numBytesTransferred +=
+ edgeStore.loadPartitionData(partitionId, basePath);
}
MessageStore messageStore;
if (currentSuperstep == superstep) {
@@ -76,12 +79,19 @@ public class LoadPartitionIOCommand extends IOCommand {
messageStore = oocEngine.getServerData().getIncomingMessageStore();
}
if (messageStore != null) {
- ((DiskBackedMessageStore) messageStore)
+ numBytesTransferred += ((DiskBackedMessageStore) messageStore)
.loadPartitionData(partitionId, basePath);
}
oocEngine.getMetaPartitionManager()
.doneLoadingPartition(partitionId, superstep);
+ executed = true;
}
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.LOAD_PARTITION;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
index 41a0682..f1769dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
@@ -54,7 +54,8 @@ public class StoreDataBufferIOCommand extends IOCommand {
}
@Override
- public void execute(String basePath) throws IOException {
+ public boolean execute(String basePath) throws IOException {
+ boolean executed = false;
if (oocEngine.getMetaPartitionManager()
.startOffloadingBuffer(partitionId)) {
switch (type) {
@@ -62,23 +63,32 @@ public class StoreDataBufferIOCommand extends IOCommand {
DiskBackedPartitionStore partitionStore =
(DiskBackedPartitionStore)
oocEngine.getServerData().getPartitionStore();
- partitionStore.offloadBuffers(partitionId, basePath);
+ numBytesTransferred +=
+ partitionStore.offloadBuffers(partitionId, basePath);
DiskBackedEdgeStore edgeStore =
(DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
- edgeStore.offloadBuffers(partitionId, basePath);
+ numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath);
break;
case MESSAGE:
DiskBackedMessageStore messageStore =
(DiskBackedMessageStore)
oocEngine.getServerData().getIncomingMessageStore();
- messageStore.offloadBuffers(partitionId, basePath);
+ numBytesTransferred +=
+ messageStore.offloadBuffers(partitionId, basePath);
break;
default:
throw new IllegalStateException("execute: requested data buffer type " +
"does not exist!");
}
oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
+ executed = true;
}
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_BUFFER;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
index 9c1c0a2..c9d8829 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
@@ -41,16 +41,25 @@ public class StoreIncomingMessageIOCommand extends IOCommand {
}
@Override
- public void execute(String basePath) throws IOException {
+ public boolean execute(String basePath) throws IOException {
+ boolean executed = false;
if (oocEngine.getMetaPartitionManager()
.startOffloadingMessages(partitionId)) {
DiskBackedMessageStore messageStore =
(DiskBackedMessageStore)
oocEngine.getServerData().getIncomingMessageStore();
checkState(messageStore != null);
- messageStore.offloadPartitionData(partitionId, basePath);
+ numBytesTransferred +=
+ messageStore.offloadPartitionData(partitionId, basePath);
oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+ executed = true;
}
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_MESSAGE;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
index 77955dc..797ac9d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
@@ -44,29 +44,38 @@ public class StorePartitionIOCommand extends IOCommand {
}
@Override
- public void execute(String basePath) throws IOException {
+ public boolean execute(String basePath) throws IOException {
+ boolean executed = false;
if (oocEngine.getMetaPartitionManager()
.startOffloadingPartition(partitionId)) {
DiskBackedPartitionStore partitionStore =
(DiskBackedPartitionStore)
oocEngine.getServerData().getPartitionStore();
- partitionStore.offloadPartitionData(partitionId, basePath);
- if (oocEngine.getServiceWorker().getSuperstep() !=
- BspService.INPUT_SUPERSTEP) {
+ numBytesTransferred +=
+ partitionStore.offloadPartitionData(partitionId, basePath);
+ if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
MessageStore messageStore =
oocEngine.getServerData().getCurrentMessageStore();
if (messageStore != null) {
- ((DiskBackedMessageStore) messageStore)
+ numBytesTransferred += ((DiskBackedMessageStore) messageStore)
.offloadPartitionData(partitionId, basePath);
}
} else {
DiskBackedEdgeStore edgeStore =
(DiskBackedEdgeStore)
oocEngine.getServerData().getEdgeStore();
- edgeStore.offloadPartitionData(partitionId, basePath);
+ numBytesTransferred +=
+ edgeStore.offloadPartitionData(partitionId, basePath);
}
oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+ executed = true;
}
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_PARTITION;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
index b6e0546..74e72eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
@@ -42,13 +42,19 @@ public class WaitIOCommand extends IOCommand {
}
@Override
- public void execute(String basePath) throws IOException {
+ public boolean execute(String basePath) throws IOException {
try {
TimeUnit.MILLISECONDS.sleep(waitDuration);
} catch (InterruptedException e) {
throw new IllegalStateException("execute: caught InterruptedException " +
"while IO thread is waiting!");
}
+ return true;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.WAIT;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
index 09c1bdf..43a92a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
@@ -38,6 +38,12 @@ public final class AdjustableSemaphore extends Semaphore {
maxPermits = permits;
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ "UG_SYNC_SET_UNSYNC_GET")
+ public int getMaxPermits() {
+ return maxPermits;
+ }
+
/**
* Adjusts the maximum number of available permits.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index d29e46d..bf48ea8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -75,6 +75,7 @@ import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionExchange;
import org.apache.giraph.partition.PartitionOwner;
@@ -213,6 +214,10 @@ public class BspServiceWorker<I extends WritableComparable,
workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
graphTaskManager.createUncaughtExceptionHandler());
workerServer.setFlowControl(workerClient.getFlowControl());
+ OutOfCoreEngine oocEngine = workerServer.getServerData().getOocEngine();
+ if (oocEngine != null) {
+ oocEngine.setFlowControl(workerClient.getFlowControl());
+ }
workerAggregatorRequestProcessor =
new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index b7f1eb6..87583ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -27,6 +27,7 @@ import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.filters.EdgeInputFilter;
import org.apache.giraph.io.InputType;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.hadoop.io.Writable;
@@ -154,7 +155,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
long inputSplitEdgesLoaded = 0;
long inputSplitEdgesFiltered = 0;
+ int count = 0;
+ OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
while (edgeReader.nextEdge()) {
+ // If out-of-core mechanism is used, check whether this thread
+ // can stay active or it should temporarily suspend and stop
+ // processing and generating more data for the moment.
+ if (oocEngine != null &&
+ (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+ oocEngine.activeThreadCheckIn();
+ }
I sourceId = edgeReader.getCurrentSourceId();
Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
if (sourceId == null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 92b23bd..40a3bb0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -33,6 +34,7 @@ import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
@@ -78,8 +80,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
private final long startNanos = TIME.getNanoseconds();
/** Whether to prioritize local input splits. */
private final boolean useLocality;
+ /** Service worker */
+ private final CentralizedServiceWorker<I, V, E> serviceWorker;
- // CHECKSTYLE: stop ParameterNumberCheck
/**
* Constructor.
*
@@ -101,8 +104,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
this.useLocality = configuration.useInputSplitLocality();
this.splitsHandler = splitsHandler;
this.configuration = configuration;
+ this.serviceWorker = bspServiceWorker;
}
- // CHECKSTYLE: resume ParameterNumberCheck
/**
* Get input format
@@ -208,13 +211,26 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
byte[] serializedInputSplit;
int inputSplitsProcessed = 0;
try {
+ OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
+ if (oocEngine != null) {
+ oocEngine.processingThreadStart();
+ }
while ((serializedInputSplit =
splitsHandler.reserveInputSplit(getInputType())) != null) {
+ // If out-of-core mechanism is used, check whether this thread
+ // can stay active or it should temporarily suspend and stop
+ // processing and generating more data for the moment.
+ if (oocEngine != null) {
+ oocEngine.activeThreadCheckIn();
+ }
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
loadInputSplit(serializedInputSplit));
context.progress();
++inputSplitsProcessed;
}
+ if (oocEngine != null) {
+ oocEngine.processingThreadFinish();
+ }
} catch (InterruptedException e) {
throw new IllegalStateException("call: InterruptedException", e);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 540a6b4..a3f1300 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -31,6 +31,7 @@ import org.apache.giraph.io.VertexReader;
import org.apache.giraph.io.filters.VertexInputFilter;
import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.io.InputType;
+import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
@@ -168,7 +169,16 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
long edgesSinceLastUpdate = 0;
long inputSplitEdgesLoaded = 0;
+ int count = 0;
+ OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
while (vertexReader.nextVertex()) {
+ // If out-of-core mechanism is used, check whether this thread
+ // can stay active or it should temporarily suspend and stop
+ // processing and generating more data for the moment.
+ if (oocEngine != null &&
+ (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
+ oocEngine.activeThreadCheckIn();
+ }
Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
if (readerVertex.getId() == null) {
throw new IllegalArgumentException(
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 7893940..3bb35eb 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -279,13 +280,14 @@ public class TestPartitionStores {
GiraphConstants.STATIC_GRAPH.set(conf, true);
testMultiThreaded();
}
-/*
+
@Test
public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
GiraphConstants.STATIC_GRAPH.set(conf, true);
+ NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
testMultiThreaded();
}
-*/
+
private void testMultiThreaded() throws Exception {
final AtomicInteger vertexCounter = new AtomicInteger(0);
ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index 6fdfc75..397605d 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -18,6 +18,7 @@
package org.apache.giraph;
+import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
@@ -62,12 +63,13 @@ public class TestOutOfCore extends BspCase {
SimplePageRankComputation.SimplePageRankWorkerContext.class);
conf.setMasterComputeClass(
SimplePageRankComputation.SimplePageRankMasterCompute.class);
+ GiraphConstants.METRICS_ENABLE.set(conf, true);
GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+ NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8);
GiraphConstants.NUM_INPUT_THREADS.set(conf, 8);
- GiraphConstants.NUM_OOC_THREADS.set(conf, 4);
GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8);
GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2");
GiraphJob job = prepareJob(getCallingMethodName(), conf,