You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/07/12 18:38:31 UTC
git commit: updated refs/heads/trunk to b0262f8
Repository: giraph
Updated Branches:
refs/heads/trunk b51ecd27c -> b0262f8c8
[GIRAPH-1089] Fix a bug in out-of-core infrastructure
Summary: This diff fixes a bug in out-of-core infrastructure that caused user requirement (max number of partitions in memory) for fixed out-of-core strategy get violated. The cause of the problems was the un-clear definition of in-memory partitions. In this diff, we distinguish the partitions that are entirely in memory from those that are partially in memory.
Test Plan:
mvn clean verify
Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo
Reviewed By: maja.kabiljo
Differential Revision: https://reviews.facebook.net/D60573
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b0262f8c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b0262f8c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b0262f8c
Branch: refs/heads/trunk
Commit: b0262f8c81c352c0cf3ac11e1e98646aa9587944
Parents: b51ecd2
Author: Hassan Eslami <he...@fb.com>
Authored: Tue Jul 12 11:33:38 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Jul 12 11:33:38 2016 -0700
----------------------------------------------------------------------
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 2 +-
.../apache/giraph/ooc/OutOfCoreIOCallable.java | 4 +-
.../apache/giraph/ooc/OutOfCoreIOScheduler.java | 4 +-
.../giraph/ooc/data/MetaPartitionManager.java | 202 ++++++++++++++++---
.../ooc/persistence/LocalDiskDataAccessor.java | 4 -
.../ooc/policy/FixedPartitionsOracle.java | 15 +-
.../giraph/partition/TestPartitionStores.java | 18 +-
.../java/org/apache/giraph/TestOutOfCore.java | 49 +++--
8 files changed, 230 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index d5bfd4f..65399b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -491,7 +491,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() {
@Override
public Double value() {
- return metaPartitionManager.getLowestGraphFractionInMemory() * 100;
+ return metaPartitionManager.getGraphFractionInMemory() * 100;
}
});
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index 829ad80..c21be95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -76,7 +76,7 @@ public class OutOfCoreIOCallable implements Callable<Void>,
while (true) {
oocEngine.getSuperstepLock().readLock().lock();
IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
- if (LOG.isInfoEnabled()) {
+ if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
LOG.info("call: thread " + diskId + "'s next IO command is: " +
command);
}
@@ -101,7 +101,7 @@ public class OutOfCoreIOCallable implements Callable<Void>,
timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
.getSuperstepGCTime() - timeInGC;
bytes = command.bytesTransferred();
- if (LOG.isInfoEnabled()) {
+ if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) {
LOG.info("call: thread " + diskId + "'s command " + command +
" completed: bytes= " + bytes + ", duration=" + duration + ", " +
"bandwidth=" + String.format("%.2f", (double) bytes / duration *
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
index 906607d..3dc1019 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -102,8 +102,8 @@ public class OutOfCoreIOScheduler {
}
OutOfCoreOracle.IOAction[] actions =
oocEngine.getOracle().getNextIOActions();
- if (LOG.isInfoEnabled()) {
- LOG.info("getNextIOCommand: actions are " + Arrays.toString(actions));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions));
}
// Check whether there are any urgent outstanding load requests
if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 3075829..173b451 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -54,13 +54,31 @@ public class MetaPartitionManager {
/** Different storage states for data */
private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
/**
+ * Different storage states for a partition as a whole (i.e. the partition
+ * and its current messages)
+ */
+ private enum PartitionStorageState
+ /**
+ * Either both partition and its current messages are in memory, or both
+ * are on disk, or one part is on disk and the other part is in memory.
+ */
+ { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK };
+ /**
* Different processing states for partitions. Processing states are reset
* at the beginning of each iteration cycle over partitions.
*/
private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
- /** Number of in-memory partitions */
+ /**
+ * Number of partitions in-memory (partition and current messages in memory)
+ */
private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
+ /**
+ * Number of partitions that are partially in-memory (either partition or its
+ * current messages is in memory and the other part is not)
+ */
+ private final AtomicInteger numPartiallyInMemoryPartitions =
+ new AtomicInteger(0);
/** Map (dictionary) of partitions to their meta information */
private final ConcurrentMap<Integer, MetaPartition> partitions =
Maps.newConcurrentMap();
@@ -136,8 +154,6 @@ public class MetaPartitionManager {
}
/**
- * Get number of partitions in memory
- *
* @return number of partitions in memory
*/
public int getNumInMemoryPartitions() {
@@ -145,6 +161,13 @@ public class MetaPartitionManager {
}
/**
+ * @return number of partitions that are partially in memory
+ */
+ public int getNumPartiallyInMemoryPartitions() {
+ return numPartiallyInMemoryPartitions.get();
+ }
+
+ /**
* Get total number of partitions
*
* @return total number of partitions
@@ -153,8 +176,16 @@ public class MetaPartitionManager {
return partitions.size();
}
- public double getLowestGraphFractionInMemory() {
- return lowestGraphFractionInMemory.get();
+ /**
+ * Since the statistics are based on estimates, we assume each partial
+ * partition is taking about half of the full partition in terms of memory
+ * footprint.
+ *
+ * @return estimate of fraction of graph in memory
+ */
+ public double getGraphFractionInMemory() {
+ return (getNumInMemoryPartitions() +
+ getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions();
}
/**
@@ -162,8 +193,7 @@ public class MetaPartitionManager {
* information in one of the counters.
*/
private synchronized void updateGraphFractionInMemory() {
- double graphInMemory =
- (double) getNumInMemoryPartitions() / getNumPartitions();
+ double graphInMemory = getGraphFractionInMemory();
if (graphInMemory < lowestGraphFractionInMemory.get()) {
lowestGraphFractionInMemory.set(graphInMemory);
WorkerProgress.get().updateLowestGraphPercentageInMemory(
@@ -172,6 +202,26 @@ public class MetaPartitionManager {
}
/**
+ * Update the book-keeping about number of in-memory partitions and partially
+ * in-memory partitions with regard to the storage status of the partition and
+ * its current messages before and after an update to its status.
+ *
+ * @param stateBefore the storage state of the partition and its current
+ * messages before an update
+ * @param stateAfter the storage state of the partition and its current
+ * messages after an update
+ */
+ private void updateCounters(PartitionStorageState stateBefore,
+ PartitionStorageState stateAfter) {
+ numInMemoryPartitions.getAndAdd(
+ ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) -
+ ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0));
+ numPartiallyInMemoryPartitions.getAndAdd(
+ ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) -
+ ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0));
+ }
+
+ /**
* Whether a given partition is available
*
* @param partitionId id of the partition to check if this worker owns it
@@ -266,49 +316,63 @@ public class MetaPartitionManager {
}
/**
- * Get id of a partition to offload on disk
+ * Get id of a partition to offload to disk. Prioritize offloading processed
+ * partitions over unprocessed partition. Also, prioritize offloading
+ * partitions partially in memory over partitions fully in memory.
*
* @param threadId id of the thread who is going to store the partition on
* disk
* @return id of the partition to offload on disk
*/
public Integer getOffloadPartitionId(int threadId) {
+ // First, look for a processed partition partially on disk
MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
StorageState.IN_MEM,
- StorageState.IN_MEM,
+ StorageState.ON_DISK,
null);
if (meta != null) {
return meta.getPartitionId();
}
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
+ StorageState.ON_DISK,
StorageState.IN_MEM,
- null,
null);
if (meta != null) {
return meta.getPartitionId();
}
+ // Second, look for a processed partition entirely in memory
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
- null,
+ StorageState.IN_MEM,
StorageState.IN_MEM,
null);
if (meta != null) {
return meta.getPartitionId();
}
+ // Third, look for an unprocessed partition partially on disk
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.UNPROCESSED,
StorageState.IN_MEM,
- null,
+ StorageState.ON_DISK,
null);
if (meta != null) {
return meta.getPartitionId();
}
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.UNPROCESSED,
- null,
+ StorageState.ON_DISK,
+ StorageState.IN_MEM,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+ // Forth, look for an unprocessed partition entirely in memory
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.IN_MEM,
StorageState.IN_MEM,
null);
if (meta != null) {
@@ -371,7 +435,11 @@ public class MetaPartitionManager {
}
/**
- * Get id of a partition to offload its incoming message on disk
+ * Get id of a partition to offload its incoming message on disk. Prioritize
+ * offloading messages of partitions already on disk, and then partitions
+ * in-transit, over partitions in-memory. Also, prioritize processed
+ * partitions over unprocessed (processed partitions would go on disk with
+ * more chances that unprocessed partitions)
*
* @param threadId id of the thread who is going to store the incoming
* messages on disk
@@ -389,7 +457,14 @@ public class MetaPartitionManager {
if (meta != null) {
return meta.getPartitionId();
}
-
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.IN_TRANSIT,
+ null,
+ StorageState.IN_MEM);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.UNPROCESSED,
StorageState.ON_DISK,
@@ -398,16 +473,27 @@ public class MetaPartitionManager {
if (meta != null) {
return meta.getPartitionId();
}
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.IN_TRANSIT,
+ null,
+ StorageState.IN_MEM);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
return null;
}
/**
- * Get id of a partition to load its data to memory
+ * Get id of a partition to load its data to memory. Prioritize loading an
+ * unprocessed partition over loading processed partition. Also, prioritize
+ * loading a partition partially in memory over partitions entirely on disk.
*
* @param threadId id of the thread who is going to load the partition data
* @return id of the partition to load its data to memory
*/
public Integer getLoadPartitionId(int threadId) {
+ // First, look for an unprocessed partition partially in memory
MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.UNPROCESSED,
StorageState.IN_MEM,
@@ -420,29 +506,51 @@ public class MetaPartitionManager {
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.UNPROCESSED,
StorageState.ON_DISK,
- null,
+ StorageState.IN_MEM,
null);
if (meta != null) {
return meta.getPartitionId();
}
+ // Second, look for an unprocessed partition entirely on disk
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.UNPROCESSED,
+ StorageState.ON_DISK,
+ StorageState.ON_DISK,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
+ // Third, look for a processed partition partially in memory
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
+ StorageState.IN_MEM,
StorageState.ON_DISK,
- null,
null);
if (meta != null) {
- meta.getPartitionId();
+ return meta.getPartitionId();
}
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
- null,
StorageState.ON_DISK,
+ StorageState.IN_MEM,
null);
if (meta != null) {
- meta.getPartitionId();
+ return meta.getPartitionId();
}
+
+ // Forth, look for a processed partition entirely on disk
+ meta = perThreadPartitionDictionary.get(threadId).lookup(
+ ProcessingState.PROCESSED,
+ StorageState.ON_DISK,
+ StorageState.ON_DISK,
+ null);
+ if (meta != null) {
+ return meta.getPartitionId();
+ }
+
return null;
}
@@ -536,9 +644,9 @@ public class MetaPartitionManager {
*/
public void doneLoadingPartition(int partitionId, long superstep) {
MetaPartition meta = partitions.get(partitionId);
- numInMemoryPartitions.getAndIncrement();
int owner = getOwnerThreadId(partitionId);
synchronized (meta) {
+ PartitionStorageState stateBefore = meta.getPartitionStorageState();
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.setPartitionState(StorageState.IN_MEM);
if (superstep == oocEngine.getSuperstep()) {
@@ -546,6 +654,8 @@ public class MetaPartitionManager {
} else {
meta.setIncomingMessagesState(StorageState.IN_MEM);
}
+ PartitionStorageState stateAfter = meta.getPartitionStorageState();
+ updateCounters(stateBefore, stateAfter);
// Check whether load was to prefetch a partition from disk to memory for
// the next superstep
if (meta.getProcessingState() == ProcessingState.PROCESSED) {
@@ -553,6 +663,7 @@ public class MetaPartitionManager {
}
perThreadPartitionDictionary.get(owner).addPartition(meta);
}
+ updateGraphFractionInMemory();
}
/**
@@ -631,8 +742,16 @@ public class MetaPartitionManager {
(meta.getPartitionState() == StorageState.IN_MEM ||
meta.getCurrentMessagesState() == StorageState.IN_MEM)) {
perThreadPartitionDictionary.get(owner).removePartition(meta);
- meta.setPartitionState(StorageState.IN_TRANSIT);
- meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+ // We may only need to offload either partition or current messages of
+ // that partition to disk. So, if either of the components (partition
+ // or its current messages) is already on disk, we should not update its
+ // metadata.
+ if (meta.getPartitionState() != StorageState.ON_DISK) {
+ meta.setPartitionState(StorageState.IN_TRANSIT);
+ }
+ if (meta.getCurrentMessagesState() != StorageState.ON_DISK) {
+ meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+ }
perThreadPartitionDictionary.get(owner).addPartition(meta);
return true;
} else {
@@ -648,16 +767,23 @@ public class MetaPartitionManager {
* @param partitionId id of the partition that its data is offloaded
*/
public void doneOffloadingPartition(int partitionId) {
- numInMemoryPartitions.getAndDecrement();
- updateGraphFractionInMemory();
MetaPartition meta = partitions.get(partitionId);
int owner = getOwnerThreadId(partitionId);
synchronized (meta) {
+ // We either offload both partition and its messages to disk, or we only
+ // offload one of the components.
+ if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT &&
+ meta.getPartitionState() == StorageState.IN_TRANSIT) {
+ numInMemoryPartitions.getAndDecrement();
+ } else {
+ numPartiallyInMemoryPartitions.getAndDecrement();
+ }
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.setPartitionState(StorageState.ON_DISK);
meta.setCurrentMessagesState(StorageState.ON_DISK);
perThreadPartitionDictionary.get(owner).addPartition(meta);
}
+ updateGraphFractionInMemory();
}
/**
@@ -675,8 +801,6 @@ public class MetaPartitionManager {
dictionary.reset();
}
numPartitionsProcessed.set(0);
- lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() /
- getNumPartitions());
}
/**
@@ -687,11 +811,10 @@ public class MetaPartitionManager {
for (MetaPartition meta : partitions.values()) {
int owner = getOwnerThreadId(meta.getPartitionId());
perThreadPartitionDictionary.get(owner).removePartition(meta);
+ PartitionStorageState stateBefore = meta.getPartitionStorageState();
meta.resetMessages();
- if (meta.getPartitionState() == StorageState.IN_MEM &&
- meta.getCurrentMessagesState() == StorageState.ON_DISK) {
- numInMemoryPartitions.getAndDecrement();
- }
+ PartitionStorageState stateAfter = meta.getPartitionStorageState();
+ updateCounters(stateBefore, stateAfter);
perThreadPartitionDictionary.get(owner).addPartition(meta);
}
}
@@ -863,6 +986,21 @@ public class MetaPartitionManager {
currentMessagesState = incomingMessagesState;
incomingMessagesState = StorageState.IN_MEM;
}
+
+ /**
+ * @return the state of the partition and its current messages as a whole
+ */
+ public PartitionStorageState getPartitionStorageState() {
+ if (partitionState == StorageState.ON_DISK &&
+ currentMessagesState == StorageState.ON_DISK) {
+ return PartitionStorageState.FULLY_ON_DISK;
+ } else if (partitionState == StorageState.IN_MEM &&
+ currentMessagesState == StorageState.IN_MEM) {
+ return PartitionStorageState.FULLY_IN_MEM;
+ } else {
+ return PartitionStorageState.PARTIALLY_IN_MEM;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
index 2e42906..8efa9de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -168,8 +168,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
LocalDiskDataInputWrapper(String fileName, byte[] buffer)
throws IOException {
file = new File(fileName);
- LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " +
- "file " + file.getAbsolutePath());
if (LOG.isDebugEnabled()) {
LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
"local file " + file.getAbsolutePath());
@@ -216,8 +214,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
byte[] buffer) throws IOException {
file = new File(fileName);
- LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " +
- "local file " + file.getAbsolutePath());
if (LOG.isDebugEnabled()) {
LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
"local file " + file.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
index ffc5f7f..002dc85 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -29,6 +29,8 @@ import org.apache.log4j.Logger;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.Preconditions.checkState;
+
/** Oracle for fixed out-of-core mechanism */
public class FixedPartitionsOracle implements OutOfCoreOracle {
/** Class logger */
@@ -63,11 +65,16 @@ public class FixedPartitionsOracle implements OutOfCoreOracle {
public IOAction[] getNextIOActions() {
int numPartitionsInMemory =
oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
- if (LOG.isInfoEnabled()) {
- LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
- " partitions in memory, " + deltaNumPartitionsInMemory.get() +
- " to be loaded");
+ int numPartialPartitionsInMemory =
+ oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory +
+ " partitions entirely in memory and " + numPartialPartitionsInMemory +
+ " partitions partially in memory, " +
+ deltaNumPartitionsInMemory.get() + " to be loaded");
}
+ checkState(numPartitionsInMemory >= 0);
+ checkState(numPartialPartitionsInMemory >= 0);
int numPartitions =
numPartitionsInMemory + deltaNumPartitionsInMemory.get();
// Fixed out-of-core policy:
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index a7451bc..1e4593b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -164,12 +164,12 @@ public class TestPartitionStores {
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = Mockito.mock(GraphTaskManager.class);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
- GraphTaskManager<IntWritable, IntWritable, NullWritable>
- graphTaskManager = new GraphTaskManager<>(context);
- Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
@@ -193,12 +193,12 @@ public class TestPartitionStores {
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = Mockito.mock(GraphTaskManager.class);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
- GraphTaskManager<IntWritable, IntWritable, NullWritable>
- graphTaskManager = new GraphTaskManager<>(context);
- Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
@@ -311,12 +311,12 @@ public class TestPartitionStores {
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = Mockito.mock(GraphTaskManager.class);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
- GraphTaskManager<IntWritable, IntWritable, NullWritable>
- graphTaskManager = new GraphTaskManager<>(context);
- Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
store =
http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index 397605d..e497541 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -27,6 +27,9 @@ import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertex
import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.ooc.OutOfCoreIOScheduler;
+import org.apache.giraph.ooc.persistence.InMemoryDataAccessor;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -38,24 +41,17 @@ import static org.junit.Assert.assertTrue;
* Unit test for out-of-core mechanism
*/
public class TestOutOfCore extends BspCase {
- final static int NUM_PARTITIONS = 32;
- final static int NUM_PARTITIONS_IN_MEMORY = 16;
+ private final static int NUM_PARTITIONS = 400;
+ private final static int NUM_PARTITIONS_IN_MEMORY = 8;
+ private GiraphConfiguration conf;
public TestOutOfCore() {
super(TestOutOfCore.class.getName());
}
- /**
- * Run a job that tests the fixed out-of-core mechanism
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testOutOfCore()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphConfiguration conf = new GiraphConfiguration();
+ @Before
+ public void prepareTest() {
+ conf = new GiraphConfiguration();
conf.setComputationClass(SimplePageRankComputation.class);
conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
@@ -66,12 +62,37 @@ public class TestOutOfCore extends BspCase {
GiraphConstants.METRICS_ENABLE.set(conf, true);
GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
- NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
+ OutOfCoreIOScheduler.OOC_WAIT_INTERVAL.set(conf, 10);
GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8);
GiraphConstants.NUM_INPUT_THREADS.set(conf, 8);
GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8);
+ }
+
+ @Test
+ public void testOutOfCoreInMemoryAccessor()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.set(conf, InMemoryDataAccessor.class);
+ GiraphConstants.NUM_OUT_OF_CORE_THREADS.set(conf, 8);
+ runTest();
+ }
+
+ @Test
+ public void testOutOfCoreLocalDiskAccessor()
+ throws IOException, InterruptedException, ClassNotFoundException {
GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2");
+ runTest();
+ }
+
+ /**
+ * Run a job with fixed out-of-core policy and verify the result
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ClassNotFoundException
+ */
+ private void runTest()
+ throws IOException, InterruptedException, ClassNotFoundException {
GiraphJob job = prepareJob(getCallingMethodName(), conf,
getTempPath(getCallingMethodName()));
// Overwrite the number of vertices set in BspCase