You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/06/28 00:05:26 UTC
[1/4] git commit: updated refs/heads/trunk to 3793c9e
Repository: giraph
Updated Branches:
refs/heads/trunk 8eb1f763d -> 3793c9ef6
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 3bb35eb..a7451bc 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -30,6 +30,7 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
@@ -166,6 +167,9 @@ public class TestPartitionStores {
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = new GraphTaskManager<>(context);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
@@ -192,6 +196,9 @@ public class TestPartitionStores {
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = new GraphTaskManager<>(context);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
partitionStore =
@@ -307,6 +314,9 @@ public class TestPartitionStores {
ServerData<IntWritable, IntWritable, NullWritable>
serverData = new ServerData<>(serviceWorker, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+ GraphTaskManager<IntWritable, IntWritable, NullWritable>
+ graphTaskManager = new GraphTaskManager<>(context);
+ Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
store =
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
index 9af00e0..0e8d83e 100644
--- a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
@@ -186,7 +186,7 @@ public class RexsterVertexOutputFormat<I extends WritableComparable,
String id = context.getTaskAttemptID().toString();
String zkBasePath = ZooKeeperManager.getBasePath(getConf()) +
BspService.BASE_DIR + "/" +
- getConf().get("mapred.job.id", "Unknown Job");
+ getConf().getJobId();
prepareBarrier(zkBasePath);
enterBarrier(zkBasePath, id);
checkBarrier(zkBasePath, context);
[4/4] git commit: updated refs/heads/trunk to 3793c9e
Posted by ma...@apache.org.
Decouple out-of-core persistence infrastructure from out-of-core computation
Summary:
This diff proposes the following:
- The persistence layer is decoupled from out-of-core infrastructure. This way one can simply implement different data accessors for various persistence resources. The persistence layer for reading/writing from/to local file system is implemented in this diff.
- Previously, out-of-core data were indexed by string literals. This has changed for more flexibility. Now, data are accessible by a more flexible data indexing mechanism, in which a chain of indices are used to address a particular data.
- With different implementations of data accessor, now there may be more emphasis on having more IO threads. It is important that these IO threads are load-balanced. In this diff, the mechanism to assign partitions to IO threads has changed.
- All the coolness of Kryo's (de)serialization and RandomAccessFile (in D59277) is included in this diff, all at one place.
Test Plan:
mvn clean verify
out-of-core snapshot test passes
Reviewers: dionysis.logothetis, maja.kabiljo, sergey.edunov
Differential Revision: https://reviews.facebook.net/D59691
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/3793c9ef
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/3793c9ef
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/3793c9ef
Branch: refs/heads/trunk
Commit: 3793c9ef69993bf4b180b6f6b15bb2a5edde5530
Parents: 8eb1f76
Author: Hassan Eslami <he...@fb.com>
Authored: Mon Jun 27 14:13:29 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Mon Jun 27 14:13:34 2016 -0700
----------------------------------------------------------------------
.gitignore | 2 +-
.../java/org/apache/giraph/bsp/BspService.java | 2 +-
.../java/org/apache/giraph/comm/ServerData.java | 6 +-
.../apache/giraph/conf/GiraphConfiguration.java | 9 -
.../org/apache/giraph/conf/GiraphConstants.java | 27 +-
.../ImmutableClassesGiraphConfiguration.java | 19 +
.../apache/giraph/graph/GraphTaskManager.java | 2 +-
.../giraph/ooc/FixedPartitionsOracle.java | 139 ------
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 32 +-
.../apache/giraph/ooc/OutOfCoreIOCallable.java | 25 +-
.../giraph/ooc/OutOfCoreIOCallableFactory.java | 64 +--
.../apache/giraph/ooc/OutOfCoreIOScheduler.java | 31 +-
.../giraph/ooc/OutOfCoreIOStatistics.java | 2 +-
.../org/apache/giraph/ooc/OutOfCoreOracle.java | 131 ------
.../giraph/ooc/SimpleGCMonitoringOracle.java | 355 ---------------
.../apache/giraph/ooc/ThresholdBasedOracle.java | 364 ----------------
.../apache/giraph/ooc/command/IOCommand.java | 104 +++++
.../ooc/command/LoadPartitionIOCommand.java | 102 +++++
.../ooc/command/StoreDataBufferIOCommand.java | 99 +++++
.../command/StoreIncomingMessageIOCommand.java | 69 +++
.../ooc/command/StorePartitionIOCommand.java | 85 ++++
.../giraph/ooc/command/WaitIOCommand.java | 64 +++
.../apache/giraph/ooc/command/package-info.java | 21 +
.../giraph/ooc/data/DiskBackedDataStore.java | 432 +++++++++++++++++++
.../giraph/ooc/data/DiskBackedEdgeStore.java | 92 ++--
.../giraph/ooc/data/DiskBackedMessageStore.java | 96 ++---
.../ooc/data/DiskBackedPartitionStore.java | 181 +++-----
.../giraph/ooc/data/MetaPartitionManager.java | 59 ++-
.../giraph/ooc/data/OutOfCoreDataManager.java | 401 -----------------
.../org/apache/giraph/ooc/io/IOCommand.java | 106 -----
.../giraph/ooc/io/LoadPartitionIOCommand.java | 102 -----
.../giraph/ooc/io/StoreDataBufferIOCommand.java | 99 -----
.../ooc/io/StoreIncomingMessageIOCommand.java | 69 ---
.../giraph/ooc/io/StorePartitionIOCommand.java | 85 ----
.../org/apache/giraph/ooc/io/WaitIOCommand.java | 64 ---
.../org/apache/giraph/ooc/io/package-info.java | 21 -
.../giraph/ooc/persistence/DataIndex.java | 198 +++++++++
.../ooc/persistence/LocalDiskDataAccessor.java | 252 +++++++++++
.../ooc/persistence/OutOfCoreDataAccessor.java | 115 +++++
.../giraph/ooc/persistence/package-info.java | 22 +
.../ooc/policy/FixedPartitionsOracle.java | 140 ++++++
.../giraph/ooc/policy/OutOfCoreOracle.java | 135 ++++++
.../ooc/policy/SimpleGCMonitoringOracle.java | 357 +++++++++++++++
.../giraph/ooc/policy/ThresholdBasedOracle.java | 365 ++++++++++++++++
.../apache/giraph/ooc/policy/package-info.java | 21 +
.../org/apache/giraph/zk/ZooKeeperManager.java | 2 +-
.../giraph/partition/TestPartitionStores.java | 10 +
.../rexster/io/RexsterVertexOutputFormat.java | 2 +-
48 files changed, 2871 insertions(+), 2309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3ae52e2..03acf8e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,7 +7,7 @@
# Build files:
*.class
target
-Unknown Job*
+UnknownJob*
failed-profile.txt
# IntelliJ IDEA files:
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index fc0fa95..9545a25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -241,7 +241,7 @@ public abstract class BspService<I extends WritableComparable,
this.context = context;
this.graphTaskManager = graphTaskManager;
this.conf = graphTaskManager.getConf();
- this.jobId = conf.get("mapred.job.id", "Unknown Job");
+ this.jobId = conf.getJobId();
this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 4156d8c..e926b6c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -150,7 +150,7 @@ public class ServerData<I extends WritableComparable,
oocEngine = new OutOfCoreEngine(conf, service);
partitionStore =
new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
- conf, context, service, oocEngine);
+ conf, context, oocEngine);
edgeStore =
new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
} else {
@@ -268,7 +268,7 @@ public class ServerData<I extends WritableComparable,
nextCurrentMessageStore = messageStore;
} else {
nextCurrentMessageStore = new DiskBackedMessageStore<>(
- conf, messageStore,
+ conf, oocEngine, messageStore,
conf.getIncomingMessageClasses().useMessageCombiner(),
serviceWorker.getSuperstep());
}
@@ -280,7 +280,7 @@ public class ServerData<I extends WritableComparable,
nextIncomingMessageStore = messageStore;
} else {
nextIncomingMessageStore = new DiskBackedMessageStore<>(
- conf, messageStore,
+ conf, oocEngine, messageStore,
conf.getOutgoingMessageClasses().useMessageCombiner(),
serviceWorker.getSuperstep() + 1);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7f1cb2b..4164c3a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1146,15 +1146,6 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Whether the application with change or not the graph topology.
- *
- * @return true if the graph is static, false otherwise.
- */
- public boolean isStaticGraph() {
- return STATIC_GRAPH.isTrue(this);
- }
-
- /**
* Get the output directory to write YourKit snapshots to
*
* @param context Map context
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c592a12..ee67bed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,8 +71,10 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.ooc.OutOfCoreOracle;
-import org.apache.giraph.ooc.ThresholdBasedOracle;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
+import org.apache.giraph.ooc.policy.ThresholdBasedOracle;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
@@ -949,11 +951,32 @@ public interface GiraphConstants {
"Comma-separated list of directories in the local filesystem for " +
"out-of-core partitions.");
+ /**
+ * Number of IO threads used in out-of-core mechanism. If local disk is used
+ * for spilling data to and reading data from, this number should be equal to
+ * the number of available disks on each machine. In such case, one should
+ * use giraph.partitionsDirectory to specify directories mounted on different
+ * disks.
+ */
+ IntConfOption NUM_OUT_OF_CORE_THREADS =
+ new IntConfOption("giraph.numOutOfCoreThreads", 1, "Number of IO " +
+ "threads used in out-of-core mechanism. If using local disk to " +
+ "spill data, this should be equal to the number of available " +
+ "disks. In such case, use giraph.partitionsDirectory to specify " +
+ "mount points on different disks.");
+
/** Enable out-of-core graph. */
BooleanConfOption USE_OUT_OF_CORE_GRAPH =
new BooleanConfOption("giraph.useOutOfCoreGraph", false,
"Enable out-of-core graph.");
+ /** Data accessor resource/object */
+ ClassConfOption<OutOfCoreDataAccessor> OUT_OF_CORE_DATA_ACCESSOR =
+ ClassConfOption.create("giraph.outOfCoreDataAccessor",
+ LocalDiskDataAccessor.class, OutOfCoreDataAccessor.class,
+ "Data accessor used in out-of-core computation (local-disk, " +
+ "in-memory, HDFS, etc.)");
+
/**
* Out-of-core oracle that is to be used for adaptive out-of-core engine. If
* the `MAX_PARTITIONS_IN_MEMORY` is already set, this will be over-written
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index b9ecf2d..1b79cba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -128,6 +128,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
* extended data input/output classes for messages
*/
private final boolean useBigDataIOForMessages;
+ /** Is the graph static (meaning there is no mutation)? */
+ private final boolean isStaticGraph;
/**
* Constructor. Takes the configuration and then gets the classes out of
@@ -144,6 +146,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
GiraphConstants.GRAPH_TYPES_NEEDS_WRAPPERS, conf);
+ isStaticGraph = GiraphConstants.STATIC_GRAPH.get(this);
valueFactories = new ValueFactories<I, V, E>(this);
}
@@ -1326,4 +1329,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
return null;
}
}
+
+ /**
+ * Whether the application with change or not the graph topology.
+ *
+ * @return true if the graph is static, false otherwise.
+ */
+ public boolean isStaticGraph() {
+ return isStaticGraph;
+ }
+
+ /**
+ * @return job id
+ */
+ public String getJobId() {
+ return get("mapred.job.id", "UnknownJob");
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 725d327..a1d8522 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -1054,7 +1054,7 @@ end[PURE_YARN]*/
* @return Time spent in GC recorder by the GC listener
*/
public long getSuperstepGCTime() {
- return gcTimeMetric.count();
+ return (gcTimeMetric == null) ? 0 : gcTimeMetric.count();
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
deleted file mode 100644
index f7badcb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/** Oracle for fixed out-of-core mechanism */
-public class FixedPartitionsOracle implements OutOfCoreOracle {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(FixedPartitionsOracle.class);
- /** Maximum number of partitions to be kept in memory */
- private final int maxPartitionsInMemory;
- /**
- * Number of partitions to be added (loaded) or removed (stored) to/from
- * memory. Each outstanding load partition counts +1 and each outstanding
- * store partition counts -1 toward this counter.
- */
- private final AtomicInteger deltaNumPartitionsInMemory =
- new AtomicInteger(0);
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
-
- /**
- * Constructor
- *
- * @param conf configuration
- * @param oocEngine out-of-core engine
- */
- public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
- OutOfCoreEngine oocEngine) {
- this.maxPartitionsInMemory =
- GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
- this.oocEngine = oocEngine;
- }
-
- @Override
- public IOAction[] getNextIOActions() {
- int numPartitionsInMemory =
- oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
- if (LOG.isInfoEnabled()) {
- LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
- " partitions in memory, " + deltaNumPartitionsInMemory.get() +
- " to be loaded");
- }
- int numPartitions =
- numPartitionsInMemory + deltaNumPartitionsInMemory.get();
- // Fixed out-of-core policy:
- // - if the number of partitions in memory is less than the max number of
- // partitions in memory, we should load a partition to memory. This
- // basically means we are prefetching partition's data either for the
- // current superstep, or for the next superstep.
- // - if the number of partitions in memory is equal to the the max number
- // of partitions in memory, we do a 'soft store', meaning, we store
- // processed partition to disk only if there is an unprocessed partition
- // on disk. This basically makes room for unprocessed partitions on disk
- // to be prefetched.
- // - if the number of partitions in memory is more than the max number of
- // partitions in memory, we do a 'hard store', meaning we store a
- // partition to disk, regardless of its processing state.
- if (numPartitions < maxPartitionsInMemory) {
- return new IOAction[]{
- IOAction.LOAD_PARTITION,
- IOAction.STORE_MESSAGES_AND_BUFFERS};
- } else if (numPartitions > maxPartitionsInMemory) {
- LOG.warn("getNextIOActions: number of partitions in memory passed the " +
- "specified threshold!");
- return new IOAction[]{
- IOAction.STORE_PARTITION,
- IOAction.STORE_MESSAGES_AND_BUFFERS};
- } else {
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.LOAD_TO_SWAP_PARTITION};
- }
- }
-
- @Override
- public boolean approve(IOCommand command) {
- int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
- .getNumInMemoryPartitions();
- // If loading a partition result in having more partition in memory, the
- // command should be denied. Also, if number of partitions in memory is
- // already less than the max number of partitions, any command for storing
- // a partition should be denied.
- if (command instanceof LoadPartitionIOCommand &&
- numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
- maxPartitionsInMemory) {
- deltaNumPartitionsInMemory.getAndDecrement();
- return false;
-
- } else if (command instanceof StorePartitionIOCommand &&
- numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
- maxPartitionsInMemory) {
- deltaNumPartitionsInMemory.getAndIncrement();
- return false;
- }
- return true;
- }
-
- @Override
- public void commandCompleted(IOCommand command) {
- if (command instanceof LoadPartitionIOCommand) {
- deltaNumPartitionsInMemory.getAndDecrement();
- } else if (command instanceof StorePartitionIOCommand) {
- deltaNumPartitionsInMemory.getAndIncrement();
- }
- }
-
- @Override
- public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
-
- @Override
- public void shutdown() { }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 2037abe..3187468 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -30,8 +30,11 @@ import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.ooc.data.MetaPartitionManager;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.giraph.ooc.policy.FixedPartitionsOracle;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
import org.apache.giraph.utils.AdjustableSemaphore;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.log4j.Logger;
@@ -87,6 +90,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
* with out-of-core operations (actual IO operations).
*/
private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
+ /** Data accessor object (DAO) used as persistence layer in out-of-core */
+ private final OutOfCoreDataAccessor dataAccessor;
/** Callable factory for IO threads */
private final OutOfCoreIOCallableFactory oocIOCallableFactory;
/**
@@ -149,9 +154,20 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
CentralizedServiceWorker<?, ?, ?> service) {
this.service = service;
- this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this);
- /* How many disk (i.e. IO threads) do we have? */
- int numIOThreads = oocIOCallableFactory.getNumDisks();
+ Class<? extends OutOfCoreDataAccessor> accessorClass =
+ GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
+ try {
+ Constructor<?> constructor = accessorClass.getConstructor(
+ ImmutableClassesGiraphConfiguration.class);
+ this.dataAccessor = (OutOfCoreDataAccessor) constructor.newInstance(conf);
+ } catch (NoSuchMethodException | InstantiationException |
+ InvocationTargetException | IllegalAccessException e) {
+ throw new IllegalStateException("OutOfCoreEngine: caught exception " +
+ "while creating the data accessor instance!", e);
+ }
+ int numIOThreads = dataAccessor.getNumAccessorThreads();
+ this.oocIOCallableFactory =
+ new OutOfCoreIOCallableFactory(this, numIOThreads);
this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
@@ -188,6 +204,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
* Initialize/Start the out-of-core engine.
*/
public void initialize() {
+ dataAccessor.initialize();
oocIOCallableFactory.createCallable();
}
@@ -201,6 +218,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
}
ioScheduler.shutdown();
oocIOCallableFactory.shutdown();
+ dataAccessor.shutdown();
}
/**
@@ -500,4 +518,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
public void setFlowControl(FlowControl flowControl) {
this.flowControl = flowControl;
}
+
+ public OutOfCoreDataAccessor getDataAccessor() {
+ return dataAccessor;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index 962bd6a..bea3994 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -23,9 +23,9 @@ import com.yammer.metrics.core.Histogram;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
import org.apache.log4j.Logger;
import java.util.concurrent.Callable;
@@ -47,8 +47,6 @@ public class OutOfCoreIOCallable implements Callable<Void>,
private static final Logger LOG = Logger.getLogger(OutOfCoreIOCallable.class);
/** Out-of-core engine */
private final OutOfCoreEngine oocEngine;
- /** Base path that this thread will write to/read from */
- private final String basePath;
/** Thread id/Disk id */
private final int diskId;
/** How many bytes of data is read from disk */
@@ -64,13 +62,10 @@ public class OutOfCoreIOCallable implements Callable<Void>,
* Constructor
*
* @param oocEngine out-of-core engine
- * @param basePath base path this thread will be using
* @param diskId thread id/disk id
*/
- public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, String basePath,
- int diskId) {
+ public OutOfCoreIOCallable(OutOfCoreEngine oocEngine, int diskId) {
this.oocEngine = oocEngine;
- this.basePath = basePath;
this.diskId = diskId;
newSuperstep(GiraphMetrics.get().perSuperstep());
GiraphMetrics.get().addSuperstepResetObserver(this);
@@ -98,15 +93,23 @@ public class OutOfCoreIOCallable implements Callable<Void>,
long bytes;
// CHECKSTYLE: stop IllegalCatch
try {
+ long timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
+ .getSuperstepGCTime();
long startTime = System.currentTimeMillis();
- commandExecuted = command.execute(basePath);
+ commandExecuted = command.execute();
duration = System.currentTimeMillis() - startTime;
+ timeInGC = oocEngine.getServiceWorker().getGraphTaskManager()
+ .getSuperstepGCTime() - timeInGC;
bytes = command.bytesTransferred();
if (LOG.isInfoEnabled()) {
LOG.info("call: thread " + diskId + "'s command " + command +
" completed: bytes= " + bytes + ", duration=" + duration + ", " +
"bandwidth=" + String.format("%.2f", (double) bytes / duration *
- 1000 / 1024 / 1024));
+ 1000 / 1024 / 1024) +
+ ((command instanceof WaitIOCommand) ? "" :
+ (", bandwidth (excluding GC time)=" + String.format("%.2f",
+ (double) bytes / (duration - timeInGC) *
+ 1000 / 1024 / 1024))));
}
} catch (Exception e) {
oocEngine.failTheJob();
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
index d4fea22..6aeb196 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
@@ -18,13 +18,11 @@
package org.apache.giraph.ooc;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
-import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -35,9 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-
/**
* Factory class to create IO threads for out-of-core engine.
*/
@@ -49,37 +44,22 @@ public class OutOfCoreIOCallableFactory {
private final OutOfCoreEngine oocEngine;
/** Result of IO threads at the end of the computation */
private final List<Future> results;
- /** How many disks (i.e. IO threads) do we have? */
- private int numDisks;
- /** Path prefix for different disks */
- private final String[] basePaths;
+ /** Number of threads used for IO operations */
+ private final int numIOThreads;
/** Executor service for IO threads */
private ExecutorService outOfCoreIOExecutor;
+
/**
* Constructor
*
- * @param conf Configuration
* @param oocEngine Out-of-core engine
+ * @param numIOThreads Number of IO threads used
*/
- public OutOfCoreIOCallableFactory(
- ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
- OutOfCoreEngine oocEngine) {
+ public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
+ int numIOThreads) {
this.oocEngine = oocEngine;
- this.results = new ArrayList<>();
- // Take advantage of multiple disks
- String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
- this.numDisks = userPaths.length;
- this.basePaths = new String[numDisks];
- int ptr = 0;
- for (String path : userPaths) {
- File file = new File(path);
- if (!file.exists()) {
- checkState(file.mkdirs(), "OutOfCoreIOCallableFactory: cannot create " +
- "directory " + file.getAbsolutePath());
- }
- basePaths[ptr] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
- ptr++;
- }
+ this.numIOThreads = numIOThreads;
+ this.results = new ArrayList<>(numIOThreads);
}
/**
@@ -90,11 +70,10 @@ public class OutOfCoreIOCallableFactory {
new CallableFactory<Void>() {
@Override
public Callable<Void> newCallable(int callableId) {
- return new OutOfCoreIOCallable(oocEngine, basePaths[callableId],
- callableId);
+ return new OutOfCoreIOCallable(oocEngine, callableId);
}
};
- outOfCoreIOExecutor = new ThreadPoolExecutor(numDisks, numDisks, 0L,
+ outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
ThreadUtils.createThreadFactory("ooc-io-%d")) {
@Override
@@ -120,7 +99,7 @@ public class OutOfCoreIOCallableFactory {
}
};
- for (int i = 0; i < numDisks; ++i) {
+ for (int i = 0; i < numIOThreads; ++i) {
Future<Void> future = outOfCoreIOExecutor.submit(
new LogStacktraceCallable<>(
outOfCoreIOCallableFactory.newCallable(i)));
@@ -131,15 +110,6 @@ public class OutOfCoreIOCallableFactory {
}
/**
- * How many disks do we have?
- *
- * @return number of disks (IO threads)
- */
- public int getNumDisks() {
- return numDisks;
- }
-
- /**
* Check whether all IO threads terminated gracefully.
*/
public void shutdown() {
@@ -156,7 +126,7 @@ public class OutOfCoreIOCallableFactory {
"InterruptedException while waiting for IO threads to finish");
}
}
- for (int i = 0; i < numDisks; ++i) {
+ for (int i = 0; i < numIOThreads; ++i) {
try {
// Check whether the tread terminated gracefully
results.get(i).get();
@@ -170,15 +140,5 @@ public class OutOfCoreIOCallableFactory {
throw new IllegalStateException(e);
}
}
- for (String path : basePaths) {
- File file = new File(path).getParentFile();
- for (String subFileName : file.list()) {
- File subFile = new File(file.getPath(), subFileName);
- checkState(subFile.delete(), "shutdown: cannot delete file %s",
- subFile.getAbsoluteFile());
- }
- checkState(file.delete(), "shutdown: cannot delete directory %s",
- file.getAbsoluteFile());
- }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
index 6428c30..906607d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -18,15 +18,15 @@
package org.apache.giraph.ooc;
-import com.google.common.hash.Hashing;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StoreDataBufferIOCommand;
-import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.StoreDataBufferIOCommand;
+import org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand;
+import org.apache.giraph.ooc.command.StorePartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.giraph.ooc.policy.OutOfCoreOracle;
import org.apache.log4j.Logger;
import java.util.ArrayList;
@@ -57,8 +57,6 @@ public class OutOfCoreIOScheduler {
private final OutOfCoreEngine oocEngine;
/** How much an IO thread should wait if there is no IO command */
private final int waitInterval;
- /** How many disks (i.e. IO threads) do we have? */
- private final int numDisks;
/**
* Queue of IO commands for loading partitions to memory. Load commands are
* urgent and should be done once loading data is a viable IO command.
@@ -77,7 +75,6 @@ public class OutOfCoreIOScheduler {
OutOfCoreIOScheduler(final ImmutableClassesGiraphConfiguration conf,
OutOfCoreEngine oocEngine, int numDisks) {
this.oocEngine = oocEngine;
- this.numDisks = numDisks;
this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
threadLoadCommandQueue = new ArrayList<>(numDisks);
for (int i = 0; i < numDisks; ++i) {
@@ -88,17 +85,6 @@ public class OutOfCoreIOScheduler {
}
/**
- * Get the thread id that is responsible for a particular partition
- *
- * @param partitionId id of the given partition
- * @return id of the thread responsible for the given partition
- */
- public int getOwnerThreadId(int partitionId) {
- int result = Hashing.murmur3_32().hashInt(partitionId).asInt() % numDisks;
- return (result >= 0) ? result : (result + numDisks);
- }
-
- /**
* Generate and return the next appropriate IO command for a given thread
*
* @param threadId id of the thread ready to execute the next IO command
@@ -254,8 +240,9 @@ public class OutOfCoreIOScheduler {
* @param ioCommand IO command to add to the scheduler
*/
public void addIOCommand(IOCommand ioCommand) {
- int ownerThread = getOwnerThreadId(ioCommand.getPartitionId());
if (ioCommand instanceof LoadPartitionIOCommand) {
+ int ownerThread = oocEngine.getMetaPartitionManager()
+ .getOwnerThreadId(ioCommand.getPartitionId());
threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
} else {
throw new IllegalStateException("addIOCommand: IO command type is not " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
index a225a4c..44a0d2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Maps;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.ooc.io.IOCommand.IOCommandType;
+import org.apache.giraph.ooc.command.IOCommand.IOCommandType;
import org.apache.log4j.Logger;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
deleted file mode 100644
index fa8e6bd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.ooc.io.IOCommand;
-
-/**
- * Interface for any out-of-core oracle. An out-of-core oracle is the brain of
- * the out-of-core mechanism, determining/deciding on out-of-core actions (load
- * or store) that should happen.
- */
-public interface OutOfCoreOracle {
- /**
- * Different types of IO actions that can potentially lead to a more desired
- * state of computation for out-of-core mechanism. These actions are issued
- * based on the status of the memory (memory pressure, rate of data transfer
- * to memory, etc.)
- */
- enum IOAction {
- /**
- * Either of:
- * - storing incoming messages of any partition currently on disk, or
- * - storing incoming messages' raw data buffer of any partition
- * currently on disk, or
- * - storing partitions' raw data buffer for those partitions that are
- * currently on disk.
- */
- STORE_MESSAGES_AND_BUFFERS,
- /**
- * Storing a partition that is *processed* in the current iteration cycle.
- * This action is also known as "soft store"
- */
- STORE_PROCESSED_PARTITION,
- /**
- * Storing a partition from memory on disk, prioritizing to *processed*
- * partitions on memory. However, if there is no *processed* partition,
- * store should happen at any cost, even if an *unprocessed* partition has
- * to be stored. This action is also know as "hard store".
- */
- STORE_PARTITION,
- /**
- * Loading an *unprocessed* partition from disk to memory, only if there are
- * *processed* partitions in memory. This action basically initiates a swap
- * operation.
- */
- LOAD_TO_SWAP_PARTITION,
- /**
- * Loading an *unprocessed* partition from disk to memory. This action is
- * also known as "soft load".
- */
- LOAD_UNPROCESSED_PARTITION,
- /**
- * Loading a partition (prioritizing *unprocessed* over *processed*) from
- * disk to memory. Loading a *processed* partition to memory is a prefetch
- * of that partition to be processed in the next superstep. This action is
- * also known as "hard load".
- */
- LOAD_PARTITION,
- /**
- * Loading a partition regardless of the memory situation. An out-of-core
- * mechanism may use this action to signal IO threads that it is allowed to
- * load a partition that is specifically requested.
- */
- URGENT_LOAD_PARTITION
- }
-
- /**
- * Get the next set of viable IO actions to help bring memory to a more
- * desired state.
- *
- * @return an array of viable IO actions, sorted from highest priority to
- * lowest priority
- */
- IOAction[] getNextIOActions();
-
- /**
- * Whether a command is appropriate to bring the memory to a more desired
- * state. A command is not executed unless it is approved by the oracle. This
- * method is specially important where there are multiple IO threads
- * performing IO operations for the out-of-core mechanism. The approval
- * becomes significantly important to prevent all IO threads from performing
- * identical command type, if that is a necessity. For instance, execution of
- * a particular command type by only one thread may bring the memory to a
- * desired state, and the rest of IO threads may perform other types of
- * commands.
- *
- * @param command the IO command that is about to execute
- * @return 'true' if the command is approved for execution. 'false' if the
- * command should not be executed
- */
- boolean approve(IOCommand command);
-
- /**
- * Notification of command completion. Oracle may update its status and commit
- * the changes a command may cause.
- *
- * @param command the IO command that is completed
- */
- void commandCompleted(IOCommand command);
-
- /**
- * Notification of GC completion. Oracle may take certain decisions based on
- * GC information (such as amount of time it took, memory it reclaimed, etc.)
- *
- * @param gcInfo GC information
- */
- void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
-
- /**
- * Shut down the out-of-core oracle. Necessary specifically for cases where
- * out-of-core oracle is using additional monitoring threads.
- */
- void shutdown();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
deleted file mode 100644
index 0dfc9de..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.google.common.collect.Maps;
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.conf.FloatConfOption;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
-import org.apache.log4j.Logger;
-
-import java.lang.management.MemoryUsage;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Out-of-core oracle to adaptively control data kept in memory, with the goal
- * of keeping the memory state constantly at a desired state. This oracle
- * monitors GC behavior to keep track of memory pressure.
- *
- * After each GC is done, this oracle retrieve statistics about the memory
- * pressure (memory used, max memory, and how far away memory is compared to a
- * max optimal pressure). Based on the the past 2 recent memory statistics,
- * the oracle predicts the status of the memory, and sets the rate of load/store
- * of data from/to disk. If the rate of loading data from disk is 'l', and the
- * rate of storing data to disk is 's', the rate of data injection to memory
- * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
- * be based on the prediction of memory status.
- *
- * Assume that based on the previous GC call the memory usage at time t_0 is
- * m_0, and based on the most recent GC call the memory usage at time t_1 is
- * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
- * Assume that the ideal memory pressure happens when the memory usage is
- * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
- * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
- * injection rate to memory so far was i, the new injection rate should be:
- * i_new = i - (alpha - beta)
- */
-public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
- /**
- * The optimal memory pressure at which GC behavior is close to ideal. This
- * fraction may be dependant on the GC strategy used for running a job, but
- * generally should not be dependent on the graph processing application.
- */
- public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
- new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
- "The memory pressure (fraction of used memory) at which the job " +
- "shows the optimal GC behavior. This fraction may be dependent " +
- "on the GC strategy used in running the job.");
-
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleGCMonitoringOracle.class);
- /** Cached value for OPTIMAL_MEMORY_PRESSURE */
- private final float optimalMemoryPressure;
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
- /** Status of memory from the last GC call */
- private GCObservation lastGCObservation;
- /** Desired rate of data injection to memory */
- private final AtomicLong desiredDiskToMemoryDataRate =
- new AtomicLong(0);
- /** Number of on the fly (outstanding) IO commands for each command type */
- private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
- Maps.newConcurrentMap();
-
- /**
- * Constructor
- *
- * @param conf configuration
- * @param oocEngine out-of-core engine
- */
- public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
- OutOfCoreEngine oocEngine) {
- this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
- this.oocEngine = oocEngine;
- this.lastGCObservation = new GCObservation(-1, 0, 0);
- for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
- commandOccurrences.put(type, new AtomicInteger(0));
- }
- }
-
- @Override
- public synchronized void gcCompleted(GarbageCollectionNotificationInfo
- gcInfo) {
- long time = System.currentTimeMillis();
- Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
- .getMemoryUsageAfterGc();
- long usedMemory = 0;
- long maxMemory = 0;
- for (MemoryUsage memDetail : memAfter.values()) {
- usedMemory += memDetail.getUsed();
- maxMemory += memDetail.getMax();
- }
- GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
- if (LOG.isInfoEnabled()) {
- LOG.info("gcCompleted: GC completed with: " + observation);
- }
- // Whether this is not the first GC call in the application
- if (lastGCObservation.isValid()) {
- long deltaDataRate =
- lastGCObservation.getDesiredDeltaDataRate(observation);
- long diskBandwidthEstimate =
- oocEngine.getIOStatistics().getDiskBandwidth();
- // Update the desired data injection rate to memory. The data injection
- // rate cannot be less than -disk_bandwidth (the extreme case happens if
- // we only do 'store'), and cannot be more than disk_bandwidth (the
- // extreme case happens if we only do 'load').
- long dataInjectionRate = desiredDiskToMemoryDataRate.get();
- desiredDiskToMemoryDataRate.set(Math.max(
- Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
- diskBandwidthEstimate), -diskBandwidthEstimate));
- if (LOG.isInfoEnabled()) {
- LOG.info("gcCompleted: changing data injection rate from " +
- String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
- " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
- 1024.0 / 1024.0));
- }
- }
- lastGCObservation = observation;
- }
-
- /**
- * Get the current data injection rate to memory based on the commands ran
- * in the history (retrieved from statistics collector), and outstanding
- * commands issued by the IO scheduler.
- *
- * @return the current data injection rate to memory
- */
- private long getCurrentDataInjectionRate() {
- long effectiveBytesTransferred = 0;
- long effectiveDuration = 0;
- for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
- OutOfCoreIOStatistics.BytesDuration stats =
- oocEngine.getIOStatistics().getCommandTypeStats(type);
- int occurrence = commandOccurrences.get(type).get();
- long typeBytesTransferred = stats.getBytes();
- long typeDuration = stats.getDuration();
- // If there is an outstanding command, we still do not know how many bytes
- // it will transfer, and how long it will take. So, we guesstimate these
- // numbers based on other similar commands happened in the history. We
- // simply take the average number of bytes transferred for the particular
- // command, and we take average duration for the particular command. We
- // should multiply these numbers by the number of outstanding commands of
- // this particular command type.
- if (stats.getOccurrence() != 0) {
- typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
- occurrence;
- typeDuration += stats.getDuration() / stats.getOccurrence() *
- occurrence;
- }
- if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
- effectiveBytesTransferred += typeBytesTransferred;
- } else {
- // Store (data going out of memory), or wait (no data transferred)
- effectiveBytesTransferred -= typeBytesTransferred;
- }
- effectiveDuration += typeDuration;
- }
- if (effectiveDuration == 0) {
- return 0;
- } else {
- return effectiveBytesTransferred / effectiveDuration;
- }
- }
-
- @Override
- public IOAction[] getNextIOActions() {
- long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
- long desiredRate = desiredDiskToMemoryDataRate.get();
- long currentRate = getCurrentDataInjectionRate();
- if (desiredRate > error) {
- // 'l-s' is positive, we should do more load than store.
- if (currentRate > desiredRate + error) {
- // We should decrease 'l-s'. This can be done either by increasing 's'
- // or issuing wait command. We prioritize wait over hard store.
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION};
- } else if (currentRate < desiredRate - error) {
- // We should increase 'l-s'. We can simply load partitions/data.
- return new IOAction[]{IOAction.LOAD_PARTITION};
- } else {
- // We are in a proper state and we should keep up with the rate. We can
- // either soft store data or load data (hard load, since we desired rate
- // is positive).
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION,
- IOAction.LOAD_PARTITION};
- }
- } else if (desiredRate < -error) {
- // 'l-s' is negative, we should do more store than load.
- if (currentRate < desiredRate - error) {
- // We should increase 'l-s', but we should be cautious. We only do soft
- // load, or wait.
- return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
- } else if (currentRate > desiredRate + error) {
- // We should reduce 'l-s', we do hard store.
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PARTITION};
- } else {
- // We should keep up with the rate. We can either soft store data, or
- // soft load data.
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION,
- IOAction.LOAD_UNPROCESSED_PARTITION};
- }
- } else {
- // 'l-s' is almost zero. If current rate is over the desired rate, we do
- // soft store. If the current rate is below the desired rate, we do soft
- // load.
- if (currentRate > desiredRate + error) {
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION};
- } else if (currentRate < desiredRate - error) {
- return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
- } else {
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION,
- IOAction.LOAD_UNPROCESSED_PARTITION};
- }
- }
- }
-
- @Override
- public synchronized boolean approve(IOCommand command) {
- long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
- long desiredRate = desiredDiskToMemoryDataRate.get();
- long currentRate = getCurrentDataInjectionRate();
- // The command is denied iff the current rate is above the desired rate and
- // we are doing load (instead of store), or the current rate is below the
- // desired rate and we are doing store (instead of loading).
- if (currentRate > desiredRate + error &&
- command instanceof LoadPartitionIOCommand) {
- return false;
- }
- if (currentRate < desiredRate - error &&
- !(command instanceof LoadPartitionIOCommand) &&
- !(command instanceof WaitIOCommand)) {
- return false;
- }
- commandOccurrences.get(command.getType()).getAndIncrement();
- return true;
- }
-
- @Override
- public void commandCompleted(IOCommand command) {
- commandOccurrences.get(command.getType()).getAndDecrement();
- }
-
- @Override
- public void shutdown() { }
-
- /** Helper class to record memory status after GC calls */
- private class GCObservation {
- /** The time at which the GC happened (in milliseconds) */
- private long time;
- /** Amount of memory used after the GC call */
- private long usedMemory;
- /** Maximum amounts of memory reported by GC listener */
- private long maxMemory;
-
- /**
- * Constructor
- *
- * @param time time of GC
- * @param usedMemory amount of used memory after GC
- * @param maxMemory amount of all available memory based on GC observation
- */
- public GCObservation(long time, long usedMemory, long maxMemory) {
- this.time = time;
- this.usedMemory = usedMemory;
- this.maxMemory = maxMemory;
- }
-
- /**
- * Is this a valid observation?
- *
- * @return true iff it is a valid observation
- */
- public boolean isValid() {
- return time > 0;
- }
-
- /**
- * Considering a new observation of memory status after the most recent GC,
- * what is the desired rate for data injection to memory.
- *
- * @param newObservation the most recent GC observation
- * @return desired rate of data injection to memory
- */
- public long getDesiredDeltaDataRate(GCObservation newObservation) {
- long newUsedMemory = newObservation.usedMemory;
- long newMaxMemory = newObservation.maxMemory;
- long lastUsedMemory = usedMemory;
- long lastMaxMemory = maxMemory;
- // Scale the memory status of two GC observation to be the same
- long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
- newUsedMemory =
- (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
- lastUsedMemory =
- (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
- long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
- if (LOG.isInfoEnabled()) {
- LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
- "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
- "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
- String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
- 1024.0));
- }
- long interval = newObservation.time - time;
- if (interval == 0) {
- interval = 1;
- LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
- "time!");
- }
- long currentDataRate = (long) ((double) (newUsedMemory -
- lastUsedMemory) / interval * 1000);
- long desiredDataRate = (long) ((double) (desiredUsedMemory -
- newUsedMemory) / interval * 1000);
- return currentDataRate - desiredDataRate;
- }
-
- @Override
- public String toString() {
- return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
- "time: %d ms)", usedMemory / 1024.0 / 1024.0,
- maxMemory / 1024.0 / 1024.0, time);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
deleted file mode 100644
index 3e05dce..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
-import org.apache.giraph.comm.flow_control.FlowControl;
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.conf.FloatConfOption;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.LongConfOption;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.log4j.Logger;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Out-of-core oracle to adaptively control data kept in memory, with the goal
- * of keeping the memory usage at a desired state. Out-of-core policy in this
- * oracle is based on several user-defined thresholds. Also, this oracle spawns
- * a thread to periodically check the memory usage. This thread would issue
- * manual GC calls if JVM fails to call major/full GC for a while and the amount
- * of used memory is about to cause high-memory pressure. This oracle, also,
- * monitors GC activities. The monitoring mechanism looks for major/full GC
- * calls, and updates out-of-core decisions based on the amount of available
- * memory after such GCs. There are three out-of-core decisions:
- * - Which IO operations should be done (load/offload of partitions and
- * messages)
- * - What the incoming messages rate should be (updating credits announced by
- * this worker in credit-based flow-control mechanism)
- * - How many processing threads should remain active (tethering rate of
- * data generation)
- *
- * The following table shows the relationship of these decisions and
- * used-defined thresholds.
- * --------------------------------------------------------------
- * Memory Pressure | Manual | IO | Credit | Active |
- * (memory usage) | GC? | Action | | Threads |
- * --------------------------------------------------------------
- * | Yes | hard | 0 | 0 |
- * | | store | | |
- * failPressure -------------------------------------------------
- * | Yes | hard | 0 | fraction |
- * | | store | | |
- * emergencyPressure --------------------------------------------
- * | Yes | hard | fraction | max |
- * | | store | | |
- * highPressure -------------------------------------------------
- * | No | soft | fraction | max |
- * | | store | | |
- * optimalPressure ----------------------------------------------
- * | No | soft | max | max |
- * | | load | | |
- * lowPressure --------------------------------------------------
- * | No | hard | max | max |
- * | | load | | |
- * --------------------------------------------------------------
- *
- */
-public class ThresholdBasedOracle implements OutOfCoreOracle {
- /** The memory pressure at/above which the job would fail */
- public static final FloatConfOption FAIL_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.failPressure", 0.975f,
- "The memory pressure (fraction of used memory) at/above which the " +
- "job would fail.");
- /**
- * The memory pressure at which the job is cloe to fail, even though we were
- * using maximal disk bandwidth and minimal network rate. We should reduce
- * job processing rate.
- */
- public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
- "The memory pressure (fraction of used memory) at which the job " +
- "is close to fail, hence we should reduce its processing rate " +
- "as much as possible.");
- /** The memory pressure at which the job is suffering from GC overhead. */
- public static final FloatConfOption HIGH_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.highPressure", 0.875f,
- "The memory pressure (fraction of used memory) at which the job " +
- "is suffering from GC overhead.");
- /**
- * The memory pressure at which we expect GC to perform optimally for a
- * memory intensive job.
- */
- public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
- "The memory pressure (fraction of used memory) at which a " +
- "memory-intensive job shows the optimal GC behavior.");
- /**
- * The memory pressure at/below which the job can use more memory without
- * suffering from GC overhead.
- */
- public static final FloatConfOption LOW_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.lowPressure", 0.7f,
- "The memory pressure (fraction of used memory) at/below which the " +
- "job can use more memory without suffering the performance.");
- /** The interval at which memory observer thread wakes up. */
- public static final LongConfOption CHECK_MEMORY_INTERVAL =
- new LongConfOption("giraph.checkMemoryInterval", 2500,
- "The interval/period where memory observer thread wakes up and " +
- "monitors memory footprint (in milliseconds)");
- /**
- * Memory observer thread would manually call GC if major/full GC has not
- * been called for a while. The period where we expect GC to be happened in
- * past is specified in this parameter
- */
- public static final LongConfOption LAST_GC_CALL_INTERVAL =
- new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
- "How long after last major/full GC should we call manual GC?");
-
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(ThresholdBasedOracle.class);
- /** Cached value for FAIL_MEMORY_PRESSURE */
- private final float failMemoryPressure;
- /** Cached value for EMERGENCY_MEMORY_PRESSURE */
- private final float emergencyMemoryPressure;
- /** Cached value for HIGH_MEMORY_PRESSURE */
- private final float highMemoryPressure;
- /** Cached value for OPTIMAL_MEMORY_PRESSURE */
- private final float optimalMemoryPressure;
- /** Cached value for LOW_MEMORY_PRESSURE */
- private final float lowMemoryPressure;
- /** Cached value for CHECK_MEMORY_INTERVAL */
- private final long checkMemoryInterval;
- /** Cached value for LAST_GC_CALL_INTERVAL */
- private final long lastGCCallInterval;
- /**
- * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
- * credit used for credit-based flow-control mechanism)
- */
- private final short maxRequestsCredit;
- /**
- * Whether the job is shutting down. Used for terminating the memory
- * observer thread.
- */
- private final CountDownLatch shouldTerminate;
- /** Result of memory observer thread */
- private final Future<Void> checkMemoryThreadResult;
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
- /** Last time a major/full GC has been called (in milliseconds) */
- private volatile long lastMajorGCTime;
- /** Last time a non major/full GC has been called (in milliseconds) */
- private volatile long lastMinorGCTime;
-
- /**
- * Constructor
- *
- * @param conf configuration
- * @param oocEngine out-of-core engine
- */
- public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
- OutOfCoreEngine oocEngine) {
- this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
- this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
- this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
- this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
- this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
- this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
- this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
- this.maxRequestsCredit = (short)
- CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
- NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
- boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
- checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
- "must be enabled. Use giraph.waitForPerWorkerRequests=true");
- this.shouldTerminate = new CountDownLatch(1);
- this.oocEngine = oocEngine;
- this.lastMajorGCTime = 0;
-
- CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
- @Override
- public Callable<Void> newCallable(int callableId) {
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while (true) {
- boolean done = shouldTerminate.await(checkMemoryInterval,
- TimeUnit.MILLISECONDS);
- if (done) {
- break;
- }
- double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
- long time = System.currentTimeMillis();
- if ((usedMemoryFraction > highMemoryPressure &&
- time - lastMajorGCTime >= lastGCCallInterval) ||
- (usedMemoryFraction > optimalMemoryPressure &&
- time - lastMajorGCTime >= lastGCCallInterval &&
- time - lastMinorGCTime >= lastGCCallInterval)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("call: last GC happened a while ago and the " +
- "amount of used memory is high (used memory " +
- "fraction is " +
- String.format("%.2f", usedMemoryFraction) + "). " +
- "Calling GC manually");
- }
- System.gc();
- time = System.currentTimeMillis() - time;
- usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
- if (LOG.isInfoEnabled()) {
- LOG.info("call: manual GC is done. It took " +
- String.format("%.2f", (double) time / 1000) +
- " seconds. Used memory fraction is " +
- String.format("%.2f", usedMemoryFraction));
- }
- }
- updateRates(usedMemoryFraction);
- }
- return null;
- }
- };
- }
- };
- ExecutorService executor = Executors.newSingleThreadExecutor(
- ThreadUtils.createThreadFactory("check-memory"));
- this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
- callableFactory.newCallable(0)));
- executor.shutdown();
- }
-
- /**
- * upon major/full GC calls.
- */
- /**
- * Update statistics and rate regarding communication credits and number of
- * active threads.
- *
- * @param usedMemoryFraction the fraction of used memory over max memory
- */
- public void updateRates(double usedMemoryFraction) {
- // Update the fraction of processing threads that should remain active
- if (usedMemoryFraction >= failMemoryPressure) {
- oocEngine.updateActiveThreadsFraction(0);
- } else if (usedMemoryFraction < emergencyMemoryPressure) {
- oocEngine.updateActiveThreadsFraction(1);
- } else {
- oocEngine.updateActiveThreadsFraction(1 -
- (usedMemoryFraction - emergencyMemoryPressure) /
- (failMemoryPressure - emergencyMemoryPressure));
- }
-
- // Update the fraction of credit that should be used in credit-based flow-
- // control
- if (usedMemoryFraction >= emergencyMemoryPressure) {
- updateRequestsCredit((short) 0);
- } else if (usedMemoryFraction < optimalMemoryPressure) {
- updateRequestsCredit(maxRequestsCredit);
- } else {
- updateRequestsCredit((short) (maxRequestsCredit *
- (1 - (usedMemoryFraction - optimalMemoryPressure) /
- (emergencyMemoryPressure - optimalMemoryPressure))));
- }
- }
-
- @Override
- public IOAction[] getNextIOActions() {
- double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
- usedMemoryFraction));
- }
- if (usedMemoryFraction > highMemoryPressure) {
- return new IOAction[]{
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PARTITION};
- } else if (usedMemoryFraction > optimalMemoryPressure) {
- return new IOAction[]{
- IOAction.LOAD_UNPROCESSED_PARTITION,
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.STORE_PROCESSED_PARTITION};
- } else if (usedMemoryFraction > lowMemoryPressure) {
- return new IOAction[]{
- IOAction.LOAD_UNPROCESSED_PARTITION,
- IOAction.STORE_MESSAGES_AND_BUFFERS,
- IOAction.LOAD_PARTITION};
- } else {
- return new IOAction[]{IOAction.LOAD_PARTITION};
- }
- }
-
- @Override
- public boolean approve(IOCommand command) {
- return true;
- }
-
- @Override
- public void commandCompleted(IOCommand command) {
- // Do nothing
- }
-
- @Override
- public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
- String gcAction = gcInfo.getGcAction().toLowerCase();
- if (gcAction.contains("full") || gcAction.contains("major")) {
- if (!gcInfo.getGcCause().contains("No GC")) {
- lastMajorGCTime = System.currentTimeMillis();
- }
- } else {
- lastMinorGCTime = System.currentTimeMillis();
- }
- }
-
- @Override
- public void shutdown() {
- shouldTerminate.countDown();
- try {
- checkMemoryThreadResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("shutdown: caught exception while waiting on check-memory " +
- "thread to terminate!");
- throw new IllegalStateException(e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
- }
- }
-
- /**
- * Update the credit announced for this worker in Netty. The lower the credit
- * is, the lower rate incoming messages arrive at this worker. Thus, credit
- * is an indirect way of controlling amount of memory incoming messages would
- * take.
- *
- * @param newCredit the new credit to announce to other workers
- */
- private void updateRequestsCredit(short newCredit) {
- if (LOG.isInfoEnabled()) {
- LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
- }
- FlowControl flowControl = oocEngine.getFlowControl();
- if (flowControl != null) {
- ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
new file mode 100644
index 0000000..b6c986d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/IOCommand.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * Representation of an IO command (moving data to disk/memory) used in
+ * out-of-core mechanism.
+ */
+public abstract class IOCommand {
+ /** Type of IO command */
+ public enum IOCommandType {
+ /** Loading a partition */
+ LOAD_PARTITION,
+ /** Storing a partition */
+ STORE_PARTITION,
+ /** Storing incoming messages of a partition */
+ STORE_MESSAGE,
+ /**
+ * Storing message/buffer raw data buffer of a currently out-of-core
+ * partition
+ */
+ STORE_BUFFER,
+ /** Doing nothing regarding IO */
+ WAIT
+ }
+
+ /** Id of the partition involved for the IO */
+ protected final int partitionId;
+ /** Out-of-core engine */
+ protected final OutOfCoreEngine oocEngine;
+ /**
+ * Number of bytes transferred to/from memory (loaded/stored) during the
+ * execution of the command
+ */
+ protected long numBytesTransferred;
+
+ /**
+ * Constructor
+ *
+ * @param oocEngine Out-of-core engine
+ * @param partitionId Id of the partition involved in the IO
+ */
+ public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
+ this.oocEngine = oocEngine;
+ this.partitionId = partitionId;
+ this.numBytesTransferred = 0;
+ }
+
+ /**
+ * Get the id of the partition involved in the IO
+ *
+ * @return id of the partition
+ */
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ /**
+ * Execute (load/store of data) the IO command, and change the data stores
+ * appropriately based on the data loaded/stored. Return true iff the command
+ * is actually executed (resulted in loading or storing data).
+ *
+ * @return whether the command is actually executed
+ * @throws IOException
+ */
+ public abstract boolean execute() throws IOException;
+
+ /**
+ * Get the type of the command.
+ *
+ * @return type of the command
+ */
+ public abstract IOCommandType getType();
+
+ /**
+ * Get the number of bytes transferred (loaded/stored from/to disk).
+ *
+ * @return number of bytes transferred during the execution of the command
+ */
+ public long bytesTransferred() {
+ return numBytesTransferred;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
new file mode 100644
index 0000000..ee12159
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/LoadPartitionIOCommand.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps). Also, this command can be used to
+ * prefetch a partition to be processed in the next superstep.
+ */
+public class LoadPartitionIOCommand extends IOCommand {
+ /**
+ * Which superstep this partition should be loaded for? (can be current
+ * superstep or next superstep -- in case of prefetching).
+ */
+ private final long superstep;
+
+ /**
+ * Constructor
+ *
+ * @param oocEngine out-of-core engine
+ * @param partitionId id of the partition to be loaded
+ * @param superstep superstep to load the partition for
+ */
+ public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
+ long superstep) {
+ super(oocEngine, partitionId);
+ this.superstep = superstep;
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ boolean executed = false;
+ if (oocEngine.getMetaPartitionManager()
+ .startLoadingPartition(partitionId, superstep)) {
+ long currentSuperstep = oocEngine.getSuperstep();
+ DiskBackedPartitionStore partitionStore =
+ (DiskBackedPartitionStore)
+ oocEngine.getServerData().getPartitionStore();
+ numBytesTransferred +=
+ partitionStore.loadPartitionData(partitionId);
+ if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
+ superstep == currentSuperstep) {
+ DiskBackedEdgeStore edgeStore =
+ (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+ numBytesTransferred +=
+ edgeStore.loadPartitionData(partitionId);
+ }
+ MessageStore messageStore;
+ if (currentSuperstep == superstep) {
+ messageStore = oocEngine.getServerData().getCurrentMessageStore();
+ } else {
+ Preconditions.checkState(superstep == currentSuperstep + 1);
+ messageStore = oocEngine.getServerData().getIncomingMessageStore();
+ }
+ if (messageStore != null) {
+ numBytesTransferred += ((DiskBackedMessageStore) messageStore)
+ .loadPartitionData(partitionId);
+ }
+ oocEngine.getMetaPartitionManager()
+ .doneLoadingPartition(partitionId, superstep);
+ executed = true;
+ }
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.LOAD_PARTITION;
+ }
+
+ @Override
+ public String toString() {
+ return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
+ "superstep = " + superstep + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
new file mode 100644
index 0000000..beda796
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreDataBufferIOCommand.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store raw data buffers on disk.
+ */
+public class StoreDataBufferIOCommand extends IOCommand {
+ /**
+ * Types of raw data buffer to offload to disk (either vertices/edges buffer
+ * in INPUT_SUPERSTEP or incoming message buffer).
+ */
+ public enum DataBufferType { PARTITION, MESSAGE };
+ /**
+ * Type of the buffer to store on disk.
+ */
+ private final DataBufferType type;
+
+ /**
+ * Constructor
+ *
+ * @param oocEngine out-of-core engine
+ * @param partitionId id of the partition to offload its buffers
+ * @param type type of the buffer to store on disk
+ */
+ public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
+ int partitionId,
+ DataBufferType type) {
+ super(oocEngine, partitionId);
+ this.type = type;
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ boolean executed = false;
+ if (oocEngine.getMetaPartitionManager()
+ .startOffloadingBuffer(partitionId)) {
+ switch (type) {
+ case PARTITION:
+ DiskBackedPartitionStore partitionStore =
+ (DiskBackedPartitionStore)
+ oocEngine.getServerData().getPartitionStore();
+ numBytesTransferred +=
+ partitionStore.offloadBuffers(partitionId);
+ DiskBackedEdgeStore edgeStore =
+ (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+ numBytesTransferred += edgeStore.offloadBuffers(partitionId);
+ break;
+ case MESSAGE:
+ DiskBackedMessageStore messageStore =
+ (DiskBackedMessageStore)
+ oocEngine.getServerData().getIncomingMessageStore();
+ numBytesTransferred +=
+ messageStore.offloadBuffers(partitionId);
+ break;
+ default:
+ throw new IllegalStateException("execute: requested data buffer type " +
+ "does not exist!");
+ }
+ oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
+ executed = true;
+ }
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_BUFFER;
+ }
+
+ @Override
+ public String toString() {
+ return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
+ "type = " + type.name() + ")";
+ }
+}
[3/4] git commit: updated refs/heads/trunk to 3793c9e
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
new file mode 100644
index 0000000..b38f957
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * IOCommand to store incoming message of a particular partition.
+ */
+public class StoreIncomingMessageIOCommand extends IOCommand {
+ /**
+ * Constructor
+ *
+ * @param oocEngine out-of-core engine
+ * @param partitionId id of the partition to store its incoming messages
+ */
+ public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
+ int partitionId) {
+ super(oocEngine, partitionId);
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ boolean executed = false;
+ if (oocEngine.getMetaPartitionManager()
+ .startOffloadingMessages(partitionId)) {
+ DiskBackedMessageStore messageStore =
+ (DiskBackedMessageStore)
+ oocEngine.getServerData().getIncomingMessageStore();
+ checkState(messageStore != null);
+ numBytesTransferred +=
+ messageStore.offloadPartitionData(partitionId);
+ oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+ executed = true;
+ }
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_MESSAGE;
+ }
+
+ @Override
+ public String toString() {
+ return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
new file mode 100644
index 0000000..31fa345
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps).
+ */
+public class StorePartitionIOCommand extends IOCommand {
+ /**
+ * Constructor
+ *
+ * @param oocEngine out-of-core engine
+ * @param partitionId id of the partition to store its data
+ */
+ public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
+ int partitionId) {
+ super(oocEngine, partitionId);
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ boolean executed = false;
+ if (oocEngine.getMetaPartitionManager()
+ .startOffloadingPartition(partitionId)) {
+ DiskBackedPartitionStore partitionStore =
+ (DiskBackedPartitionStore)
+ oocEngine.getServerData().getPartitionStore();
+ numBytesTransferred +=
+ partitionStore.offloadPartitionData(partitionId);
+ if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
+ MessageStore messageStore =
+ oocEngine.getServerData().getCurrentMessageStore();
+ if (messageStore != null) {
+ numBytesTransferred += ((DiskBackedMessageStore) messageStore)
+ .offloadPartitionData(partitionId);
+ }
+ } else {
+ DiskBackedEdgeStore edgeStore =
+ (DiskBackedEdgeStore)
+ oocEngine.getServerData().getEdgeStore();
+ numBytesTransferred +=
+ edgeStore.offloadPartitionData(partitionId);
+ }
+ oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+ executed = true;
+ }
+ return executed;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.STORE_PARTITION;
+ }
+
+ @Override
+ public String toString() {
+ return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
new file mode 100644
index 0000000..83540c1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.command;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IOCommand to do nothing regarding moving data to/from disk.
+ */
+public class WaitIOCommand extends IOCommand {
+ /** How long should the disk be idle? (in milliseconds) */
+ private final long waitDuration;
+
+ /**
+ * Constructor
+ *
+ * @param oocEngine out-of-core engine
+ * @param waitDuration duration of wait
+ */
+ public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
+ super(oocEngine, -1);
+ this.waitDuration = waitDuration;
+ }
+
+ @Override
+ public boolean execute() throws IOException {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitDuration);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("execute: caught InterruptedException " +
+ "while IO thread is waiting!");
+ }
+ return true;
+ }
+
+ @Override
+ public IOCommandType getType() {
+ return IOCommandType.WAIT;
+ }
+
+ @Override
+ public String toString() {
+ return "WaitIOCommand: (duration = " + waitDuration + "ms)";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
new file mode 100644
index 0000000..930b139
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO commands in out-of-core mechanism
+ */
+package org.apache.giraph.ooc.command;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
new file mode 100644
index 0000000..7265410
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * This class provides basic operations for data structures that have to
+ * participate in out-of-core mechanism. Essential subclasses of this class are:
+ * - DiskBackedPartitionStore (for partition data)
+ * - DiskBackedMessageStore (for messages)
+ * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
+ * Basically, any data structure that may cause OOM to happen can be implemented
+ * as a subclass of this class.
+ *
+ * There are two different terms used in the rest of this class:
+ * - "data store" refers to in-memory representation of data. Usually this is
+ * stored per-partition in in-memory implementations of data structures. For
+ * instance, "data store" of a DiskBackedPartitionStore would collection of
+ * all partitions kept in the in-memory partition store within the
+ * DiskBackedPartitionStore.
+ * - "raw data buffer" refers to raw data which were supposed to be
+ * de-serialized and added to the data store, but they remain 'as is' in the
+ * memory because their corresponding partition is offloaded to disk and is
+ * not available in the data store.
+ *
+ * @param <T> raw data format of the data store subclassing this class
+ */
+public abstract class DiskBackedDataStore<T> {
+ /**
+ * Minimum size of a buffer (in bytes) to flush to disk. This is used to
+ * decide whether vertex/edge buffers are large enough to flush to disk.
+ */
+ public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
+ new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
+ "Minimum size of a buffer (in bytes) to flush to disk.");
+
+ /** Class logger. */
+ private static final Logger LOG = Logger.getLogger(
+ DiskBackedDataStore.class);
+ /** Out-of-core engine */
+ protected final OutOfCoreEngine oocEngine;
+ /**
+ * Set containing ids of all partitions where the partition data is in some
+ * file on disk.
+ * Note that the out-of-core mechanism may decide to put the data for a
+ * partition on disk, while the partition data is empty. For instance, at the
+ * beginning of a superstep, out-of-core mechanism may decide to put incoming
+ * messages of a partition on disk, while the partition has not received any
+ * messages. In such scenarios, the "out-of-core mechanism" thinks that the
+ * partition data is on disk, while disk-backed data stores may want to
+ * optimize for IO/metadata accesses and decide not to create/write anything
+ * on files on disk.
+ * In summary, there is a subtle difference between this field and
+ * `hasPartitionOnDisk` field. Basically, this field is used for optimizing
+ * IO (mainly metadata) accesses by disk-backed stores, while
+ * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has
+ * regarding partition storage statuses. Since out-of-core mechanism does not
+ * know about the actual data for a partition, these two fields have to be
+ * separate.
+ */
+ protected final Set<Integer> hasPartitionDataOnFile =
+ Sets.newConcurrentHashSet();
+ /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
+ private final int minBufferSizeToOffload;
+ /** Set containing ids of all out-of-core partitions */
+ private final Set<Integer> hasPartitionDataOnDisk =
+ Sets.newConcurrentHashSet();
+ /**
+ * Map of partition ids to list of raw data buffers. The map will have entries
+ * only for partitions that their in-memory data structures are currently
+ * offloaded to disk. We keep the aggregate size of buffers for each partition
+ * as part of the values in the map to estimate how much memory we can free up
+ * if we offload data buffers of a particular partition to disk.
+ */
+ private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
+ Maps.newConcurrentMap();
+ /**
+ * Map of partition ids to number of raw data buffers offloaded to disk for
+ * each partition. The map will have entries only for partitions that their
+ * in-memory data structures are currently out of core. It is necessary to
+ * know the number of data buffers on disk for a particular partition when we
+ * are loading all these buffers back in memory.
+ */
+ private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
+ Maps.newConcurrentMap();
+ /**
+ * Lock to avoid overlapping of read and write on data associated with each
+ * partition.
+ * */
+ private final ConcurrentMap<Integer, ReadWriteLock> locks =
+ Maps.newConcurrentMap();
+
+ /**
+ * Constructor.
+ *
+ * @param conf Configuration
+ * @param oocEngine Out-of-core engine
+ */
+ DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf,
+ OutOfCoreEngine oocEngine) {
+ this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
+ this.oocEngine = oocEngine;
+ }
+
+ /**
+ * Retrieves a lock for a given partition. If the lock for the given partition
+ * does not exist, creates a new lock.
+ *
+ * @param partitionId id of the partition the lock is needed for
+ * @return lock for a given partition
+ */
+ private ReadWriteLock getPartitionLock(int partitionId) {
+ ReadWriteLock readWriteLock = locks.get(partitionId);
+ if (readWriteLock == null) {
+ readWriteLock = new ReentrantReadWriteLock();
+ ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
+ if (temp != null) {
+ readWriteLock = temp;
+ }
+ }
+ return readWriteLock;
+ }
+
+ /**
+ * Adds a data entry for a given partition to the current data store. If data
+ * of a given partition in data store is already offloaded to disk, adds the
+ * data entry to appropriate raw data buffer list.
+ *
+ * @param partitionId id of the partition to add the data entry to
+ * @param entry data entry to add
+ */
+ protected void addEntry(int partitionId, T entry) {
+ // Addition of data entries to a data store is much more common than
+ // out-of-core operations. Besides, in-memory data store implementations
+ // existing in the code base already account for parallel addition to data
+ // stores. Therefore, using read lock would optimize for parallel addition
+ // to data stores, specially for cases where the addition should happen for
+ // partitions that are entirely in memory.
+ ReadWriteLock rwLock = getPartitionLock(partitionId);
+ rwLock.readLock().lock();
+ if (hasPartitionDataOnDisk.contains(partitionId)) {
+ List<T> entryList = new ArrayList<>();
+ entryList.add(entry);
+ int entrySize = entrySerializedSize(entry);
+ MutablePair<Integer, List<T>> newPair =
+ new MutablePair<>(entrySize, entryList);
+ Pair<Integer, List<T>> oldPair =
+ dataBuffers.putIfAbsent(partitionId, newPair);
+ if (oldPair != null) {
+ synchronized (oldPair) {
+ newPair = (MutablePair<Integer, List<T>>) oldPair;
+ newPair.setLeft(oldPair.getLeft() + entrySize);
+ newPair.getRight().add(entry);
+ }
+ }
+ } else {
+ addEntryToInMemoryPartitionData(partitionId, entry);
+ }
+ rwLock.readLock().unlock();
+ }
+
+ /**
+ * Loads and assembles all data for a given partition, and put it into the
+ * data store. Returns the number of bytes transferred from disk to memory in
+ * the loading process.
+ *
+ * @param partitionId id of the partition to load and assemble all data for
+ * @return number of bytes loaded from disk to memory
+ * @throws IOException
+ */
+ public abstract long loadPartitionData(int partitionId) throws IOException;
+
+ /**
+ * The proxy method that does the actual operation for `loadPartitionData`,
+ * but uses the data index given by the caller.
+ *
+ * @param partitionId id of the partition to load and assemble all data for
+ * @param index data index chain for the data to load
+ * @return number of bytes loaded from disk to memory
+ * @throws IOException
+ */
+ protected long loadPartitionDataProxy(int partitionId, DataIndex index)
+ throws IOException {
+ long numBytes = 0;
+ ReadWriteLock rwLock = getPartitionLock(partitionId);
+ rwLock.writeLock().lock();
+ if (hasPartitionDataOnDisk.contains(partitionId)) {
+ int ioThreadId =
+ oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+ numBytes += loadInMemoryPartitionData(partitionId, ioThreadId,
+ index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+ hasPartitionDataOnDisk.remove(partitionId);
+ // Loading raw data buffers from disk if there is any and applying those
+ // to already loaded in-memory data.
+ Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
+ if (numBuffers != null) {
+ checkState(numBuffers > 0);
+ index.addIndex(DataIndex.TypeIndexEntry.BUFFER);
+ OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+ oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+ for (int i = 0; i < numBuffers; ++i) {
+ T entry = readNextEntry(inputWrapper.getDataInput());
+ addEntryToInMemoryPartitionData(partitionId, entry);
+ }
+ numBytes += inputWrapper.finalizeInput(true);
+ index.removeLastIndex();
+ }
+ index.removeLastIndex();
+ // Applying in-memory raw data buffers to in-memory partition data.
+ Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
+ if (pair != null) {
+ for (T entry : pair.getValue()) {
+ addEntryToInMemoryPartitionData(partitionId, entry);
+ }
+ }
+ }
+ rwLock.writeLock().unlock();
+ return numBytes;
+ }
+
+ /**
+ * Offloads partition data of a given partition in the data store to disk, and
+ * returns the number of bytes offloaded from memory to disk.
+ *
+ * @param partitionId id of the partition to offload its data
+ * @return number of bytes offloaded from memory to disk
+ * @throws IOException
+ */
+ public abstract long offloadPartitionData(int partitionId) throws IOException;
+
+ /**
+ * The proxy method that does the actual operation for `offloadPartitionData`,
+ * but uses the data index given by the caller.
+ *
+ * @param partitionId id of the partition to offload its data
+ * @param index data index chain for the data to offload
+ * @return number of bytes offloaded from memory to disk
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+ protected long offloadPartitionDataProxy(
+ int partitionId, DataIndex index) throws IOException {
+ ReadWriteLock rwLock = getPartitionLock(partitionId);
+ rwLock.writeLock().lock();
+ hasPartitionDataOnDisk.add(partitionId);
+ rwLock.writeLock().unlock();
+ int ioThreadId =
+ oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+ long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId,
+ index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)));
+ index.removeLastIndex();
+ return numBytes;
+ }
+
+ /**
+ * Offloads raw data buffers of a given partition to disk, and returns the
+ * number of bytes offloaded from memory to disk.
+ *
+ * @param partitionId id of the partition to offload its raw data buffers
+ * @return number of bytes offloaded from memory to disk
+ * @throws IOException
+ */
+ public abstract long offloadBuffers(int partitionId) throws IOException;
+
+ /**
+ * The proxy method that does the actual operation for `offloadBuffers`,
+ * but uses the data index given by the caller.
+ *
+ * @param partitionId id of the partition to offload its raw data buffers
+ * @param index data index chain for the data to offload its buffers
+ * @return number of bytes offloaded from memory to disk
+ * @throws IOException
+ */
+ protected long offloadBuffersProxy(int partitionId, DataIndex index)
+ throws IOException {
+ Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
+ if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
+ return 0;
+ }
+ ReadWriteLock rwLock = getPartitionLock(partitionId);
+ rwLock.writeLock().lock();
+ pair = dataBuffers.remove(partitionId);
+ rwLock.writeLock().unlock();
+ checkNotNull(pair);
+ checkState(!pair.getRight().isEmpty());
+ int ioThreadId =
+ oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
+ index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
+ .addIndex(DataIndex.TypeIndexEntry.BUFFER);
+ OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+ oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+ true);
+ for (T entry : pair.getRight()) {
+ writeEntry(entry, outputWrapper.getDataOutput());
+ }
+ long numBytes = outputWrapper.finalizeOutput();
+ index.removeLastIndex().removeLastIndex();
+ int numBuffers = pair.getRight().size();
+ Integer oldNumBuffersOnDisk =
+ numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
+ if (oldNumBuffersOnDisk != null) {
+ numDataBuffersOnDisk.replace(partitionId,
+ oldNumBuffersOnDisk + numBuffers);
+ }
+ return numBytes;
+ }
+
+ /**
+ * Looks through all partitions that their data is not in the data store (is
+ * offloaded to disk), and sees if any of them has enough raw data buffer in
+ * memory. If so, puts that partition in a list to return.
+ *
+ * @return Set of partition ids of all partition raw buffers where the
+ * aggregate size of buffers are large enough and it is worth flushing
+ * those buffers to disk
+ */
+ public Set<Integer> getCandidateBuffersToOffload() {
+ Set<Integer> result = new HashSet<>();
+ for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
+ dataBuffers.entrySet()) {
+ if (entry.getValue().getLeft() > minBufferSizeToOffload) {
+ result.add(entry.getKey());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Writes a single raw entry to a given output stream.
+ *
+ * @param entry entry to write to output
+ * @param out output stream to write the entry to
+ * @throws IOException
+ */
+ protected abstract void writeEntry(T entry, DataOutput out)
+ throws IOException;
+
+ /**
+ * Reads the next available raw entry from a given input stream.
+ *
+ * @param in input stream to read the entry from
+ * @return entry read from an input stream
+ * @throws IOException
+ */
+ protected abstract T readNextEntry(DataInput in) throws IOException;
+
+ /**
+ * Loads data of a partition into data store. Returns number of bytes loaded.
+ *
+ * @param partitionId id of the partition to load its data
+ * @param ioThreadId id of the IO thread performing the load
+ * @param index data index chain for the data to load
+ * @return number of bytes loaded from disk to memory
+ * @throws IOException
+ */
+ protected abstract long loadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+ /**
+ * Offloads data of a partition in data store to disk. Returns the number of
+ * bytes offloaded to disk
+ *
+ * @param partitionId id of the partition to offload to disk
+ * @param ioThreadId id of the IO thread performing the offload
+ * @param index data index chain for the data to offload
+ * @return number of bytes offloaded from memory to disk
+ * @throws IOException
+ */
+ protected abstract long offloadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException;
+
+ /**
+ * Gets the size of a given entry in bytes.
+ *
+ * @param entry input entry to find its size
+ * @return size of given input entry in bytes
+ */
+ protected abstract int entrySerializedSize(T entry);
+
+ /**
+ * Adds a single entry for a given partition to the in-memory data store.
+ *
+ * @param partitionId id of the partition to add the data to
+ * @param entry input entry to add to the data store
+ */
+ protected abstract void addEntryToInMemoryPartitionData(int partitionId,
+ T entry);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
index 53de52f..e727fbd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
@@ -21,25 +21,18 @@ package org.apache.giraph.ooc.data;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.EdgeStore;
import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
import org.apache.giraph.utils.VertexIdEdges;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Implementation of an edge-store used for out-of-core mechanism.
*
@@ -49,7 +42,7 @@ import static com.google.common.base.Preconditions.checkState;
*/
public class DiskBackedEdgeStore<I extends WritableComparable,
V extends Writable, E extends Writable>
- extends OutOfCoreDataManager<VertexIdEdges<I, E>>
+ extends DiskBackedDataStore<VertexIdEdges<I, E>>
implements EdgeStore<I, V, E> {
/** Class logger. */
private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class);
@@ -57,8 +50,6 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
private final EdgeStore<I, V, E> edgeStore;
/** Configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
/**
* Constructor
@@ -72,10 +63,9 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
EdgeStore<I, V, E> edgeStore,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
OutOfCoreEngine oocEngine) {
- super(conf);
+ super(conf, oocEngine);
this.edgeStore = edgeStore;
this.conf = conf;
- this.oocEngine = oocEngine;
}
@Override
@@ -114,32 +104,25 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
"should not be called for DiskBackedEdgeStore!");
}
- /**
- * Gets the path that should be used specifically for edge data.
- *
- * @param basePath path prefix to build the actual path from
- * @return path to files specific for edge data
- */
- private static String getPath(String basePath) {
- return basePath + "_edge_store";
- }
-
@Override
- public long loadPartitionData(int partitionId, String basePath)
+ public long loadPartitionData(int partitionId)
throws IOException {
- return super.loadPartitionData(partitionId, getPath(basePath));
+ return loadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
}
@Override
- public long offloadPartitionData(int partitionId, String basePath)
+ public long offloadPartitionData(int partitionId)
throws IOException {
- return super.offloadPartitionData(partitionId, getPath(basePath));
+ return offloadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
}
@Override
- public long offloadBuffers(int partitionId, String basePath)
+ public long offloadBuffers(int partitionId)
throws IOException {
- return super.offloadBuffers(partitionId, getPath(basePath));
+ return offloadBuffersProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE));
}
@Override
@@ -157,44 +140,31 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
}
@Override
- protected long loadInMemoryPartitionData(int partitionId, String path)
- throws IOException {
+ protected long loadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException {
long numBytes = 0;
- File file = new File(path);
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadInMemoryPartitionData: loading edge data for " +
- "partition " + partitionId + " from " + file.getAbsolutePath());
- }
- FileInputStream fis = new FileInputStream(file);
- BufferedInputStream bis = new BufferedInputStream(fis);
- DataInputStream dis = new DataInputStream(bis);
- edgeStore.readPartitionEdgeStore(partitionId, dis);
- dis.close();
- numBytes = file.length();
- checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
- "%s.", file.getAbsoluteFile());
+ if (hasPartitionDataOnFile.remove(partitionId)) {
+ OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+ oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+ edgeStore.readPartitionEdgeStore(partitionId,
+ inputWrapper.getDataInput());
+ numBytes = inputWrapper.finalizeInput(true);
}
return numBytes;
}
@Override
- protected long offloadInMemoryPartitionData(int partitionId, String path)
- throws IOException {
+ protected long offloadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException {
long numBytes = 0;
if (edgeStore.hasEdgesForPartition(partitionId)) {
- File file = new File(path);
- checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " +
- "file %s already exist", file.getAbsoluteFile());
- checkState(file.createNewFile(),
- "offloadInMemoryPartitionData: cannot create edge store file %s",
- file.getAbsoluteFile());
- FileOutputStream fos = new FileOutputStream(file);
- BufferedOutputStream bos = new BufferedOutputStream(fos);
- DataOutputStream dos = new DataOutputStream(bos);
- edgeStore.writePartitionEdgeStore(partitionId, dos);
- dos.close();
- numBytes = dos.size();
+ OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+ oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+ false);
+ edgeStore.writePartitionEdgeStore(partitionId,
+ outputWrapper.getDataOutput());
+ numBytes = outputWrapper.finalizeOutput();
+ hasPartitionDataOnFile.add(partitionId);
}
return numBytes;
}
@@ -205,7 +175,7 @@ public class DiskBackedEdgeStore<I extends WritableComparable,
}
@Override
- protected void addEntryToImMemoryPartitionData(int partitionId,
+ protected void addEntryToInMemoryPartitionData(int partitionId,
VertexIdEdges<I, E> edges) {
oocEngine.getMetaPartitionManager().addPartition(partitionId);
edgeStore.addPartitionEdges(partitionId, edges);
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index 94ba83a..c8d0f79 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -21,6 +21,10 @@ package org.apache.giraph.ooc.data;
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.VertexIdMessages;
@@ -28,19 +32,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* Implementation of a message store used for out-of-core mechanism.
*
@@ -48,7 +43,7 @@ import static com.google.common.base.Preconditions.checkState;
* @param <M> Message data
*/
public class DiskBackedMessageStore<I extends WritableComparable,
- M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>>
+ M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>>
implements MessageStore<I, M> {
/** Class logger. */
private static final Logger LOG =
@@ -82,6 +77,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
* Constructor
*
* @param config Configuration
+ * @param oocEngine Out-of-core engine
* @param messageStore In-memory message store for which out-of-core message
* store would be wrapper
* @param useMessageCombiner Whether message combiner is used for this message
@@ -90,9 +86,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
*/
public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?>
config,
+ OutOfCoreEngine oocEngine,
MessageStore<I, M> messageStore,
boolean useMessageCombiner, long superstep) {
- super(config);
+ super(config, oocEngine);
this.config = config;
this.messageStore = messageStore;
this.useMessageCombiner = useMessageCombiner;
@@ -140,43 +137,38 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
}
- /**
- * Gets the path that should be used specifically for message data.
- *
- * @param basePath path prefix to build the actual path from
- * @param superstep superstep for which message data should be stored
- * @return path to files specific for message data
- */
- private static String getPath(String basePath, long superstep) {
- return basePath + "_messages-S" + superstep;
- }
@Override
- public long loadPartitionData(int partitionId, String basePath)
+ public long loadPartitionData(int partitionId)
throws IOException {
if (!useMessageCombiner) {
- return super.loadPartitionData(partitionId, getPath(basePath, superstep));
+ return loadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+ .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
}
@Override
- public long offloadPartitionData(int partitionId, String basePath)
+ public long offloadPartitionData(int partitionId)
throws IOException {
if (!useMessageCombiner) {
- return
- super.offloadPartitionData(partitionId, getPath(basePath, superstep));
+ return offloadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+ .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
}
@Override
- public long offloadBuffers(int partitionId, String basePath)
+ public long offloadBuffers(int partitionId)
throws IOException {
if (!useMessageCombiner) {
- return super.offloadBuffers(partitionId, getPath(basePath, superstep));
+ return offloadBuffersProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE)
+ .addIndex(NumericIndexEntry.createSuperstepEntry(superstep)));
} else {
return 0;
}
@@ -250,45 +242,31 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
- protected long loadInMemoryPartitionData(int partitionId, String basePath)
- throws IOException {
+ protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+ DataIndex index) throws IOException {
long numBytes = 0;
- File file = new File(basePath);
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadInMemoryPartitionData: loading message data for " +
- "partition " + partitionId + " from " + file.getAbsolutePath());
- }
- FileInputStream fis = new FileInputStream(file);
- BufferedInputStream bis = new BufferedInputStream(fis);
- DataInputStream dis = new DataInputStream(bis);
- messageStore.readFieldsForPartition(dis, partitionId);
- dis.close();
- numBytes = file.length();
- checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
- "%s.", file.getAbsoluteFile());
+ if (hasPartitionDataOnFile.remove(partitionId)) {
+ OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+ oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy());
+ messageStore.readFieldsForPartition(inputWrapper.getDataInput(),
+ partitionId);
+ numBytes = inputWrapper.finalizeInput(true);
}
return numBytes;
}
@Override
- protected long offloadInMemoryPartitionData(int partitionId, String basePath)
- throws IOException {
+ protected long offloadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException {
long numBytes = 0;
if (messageStore.hasMessagesForPartition(partitionId)) {
- File file = new File(basePath);
- checkState(!file.exists(), "offloadInMemoryPartitionData: message store" +
- " file %s already exist", file.getAbsoluteFile());
- checkState(file.createNewFile(),
- "offloadInMemoryPartitionData: cannot create message store file %s",
- file.getAbsoluteFile());
- FileOutputStream fileout = new FileOutputStream(file);
- BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
- DataOutputStream outputStream = new DataOutputStream(bufferout);
- messageStore.writePartition(outputStream, partitionId);
+ OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+ oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
+ false);
+ messageStore.writePartition(outputWrapper.getDataOutput(), partitionId);
messageStore.clearPartition(partitionId);
- outputStream.close();
- numBytes += outputStream.size();
+ numBytes = outputWrapper.finalizeOutput();
+ hasPartitionDataOnFile.add(partitionId);
}
return numBytes;
}
@@ -299,7 +277,7 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
- protected void addEntryToImMemoryPartitionData(int partitionId,
+ protected void addEntryToInMemoryPartitionData(int partitionId,
VertexIdMessages<I, M>
messages) {
messageStore.addPartitionMessages(partitionId, messages);
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
index 2a5e47a..6b7822f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -20,11 +20,12 @@ package org.apache.giraph.ooc.data;
import com.google.common.collect.Maps;
import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.persistence.DataIndex;
+import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.utils.ExtendedDataOutput;
@@ -35,25 +36,17 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
/**
* Implementation of a partition-store used for out-of-core mechanism.
* Partition store is responsible for partition data, as well as data buffers in
- * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager --
+ * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore --
* refers to vertex buffers in INPUT_SUPERSTEP).
*
* @param <I> Vertex id
@@ -62,7 +55,7 @@ import static com.google.common.base.Preconditions.checkState;
*/
public class DiskBackedPartitionStore<I extends WritableComparable,
V extends Writable, E extends Writable>
- extends OutOfCoreDataManager<ExtendedDataOutput>
+ extends DiskBackedDataStore<ExtendedDataOutput>
implements PartitionStore<I, V, E> {
/** Class logger. */
private static final Logger LOG =
@@ -71,10 +64,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
/** Job context (for progress) */
private final Mapper<?, ?, ?, ?>.Context context;
- /** Service worker */
- private final CentralizedServiceWorker<I, V, E> serviceWorker;
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
/** In-memory partition store */
private final PartitionStore<I, V, E> partitionStore;
/**
@@ -99,21 +88,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
* partition store would be a wrapper
* @param conf Configuration
* @param context Job context
- * @param serviceWorker Service worker
* @param oocEngine Out-of-core engine
*/
public DiskBackedPartitionStore(
PartitionStore<I, V, E> partitionStore,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E> serviceWorker,
OutOfCoreEngine oocEngine) {
- super(conf);
+ super(conf, oocEngine);
this.partitionStore = partitionStore;
this.conf = conf;
this.context = context;
- this.serviceWorker = serviceWorker;
- this.oocEngine = oocEngine;
}
@Override
@@ -222,36 +207,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Gets the path that should be used specifically for partition data.
- *
- * @param basePath path prefix to build the actual path from
- * @return path to files specific for partition data
- */
- private static String getPath(String basePath) {
- return basePath + "_partition";
- }
-
- /**
- * Get the path to the file where vertices are stored.
- *
- * @param basePath path prefix to build the actual path from
- * @return The path to the vertices file
- */
- private static String getVerticesPath(String basePath) {
- return basePath + "_vertices";
- }
-
- /**
- * Get the path to the file where edges are stored.
- *
- * @param basePath path prefix to build the actual path from
- * @return The path to the edges file
- */
- private static String getEdgesPath(String basePath) {
- return basePath + "_edges";
- }
-
- /**
* Read vertex data from an input and initialize the vertex.
*
* @param in The input stream
@@ -295,54 +250,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- protected long loadInMemoryPartitionData(int partitionId, String path)
- throws IOException {
+ protected long loadInMemoryPartitionData(int partitionId, int ioThreadId,
+ DataIndex index) throws IOException {
long numBytes = 0;
// Load vertices
- File file = new File(getVerticesPath(path));
- if (file.exists()) {
+ if (hasPartitionDataOnFile.remove(partitionId)) {
Partition<I, V, E> partition = conf.createPartition(partitionId, context);
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadInMemoryPartitionData: loading partition vertices " +
- partitionId + " from " + file.getAbsolutePath());
- }
-
- FileInputStream fis = new FileInputStream(file);
- BufferedInputStream bis = new BufferedInputStream(fis);
- DataInputStream inputStream = new DataInputStream(bis);
- long numVertices = inputStream.readLong();
+ OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
+ index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+ OutOfCoreDataAccessor.DataInputWrapper inputWrapper =
+ dataAccessor.prepareInput(ioThreadId, index.copy());
+ DataInput dataInput = inputWrapper.getDataInput();
+ long numVertices = dataInput.readLong();
for (long i = 0; i < numVertices; ++i) {
Vertex<I, V, E> vertex = conf.createVertex();
- readVertexData(inputStream, vertex);
+ readVertexData(dataInput, vertex);
partition.putVertex(vertex);
}
- inputStream.close();
- numBytes += file.length();
- checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " +
- "%s", file.getAbsolutePath());
+ numBytes += inputWrapper.finalizeInput(true);
// Load edges
- file = new File(getEdgesPath(path));
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadInMemoryPartitionData: loading partition edges " +
- partitionId + " from " + file.getAbsolutePath());
- }
-
- fis = new FileInputStream(file);
- bis = new BufferedInputStream(fis);
- inputStream = new DataInputStream(bis);
+ index.removeLastIndex()
+ .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
+ inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy());
+ dataInput = inputWrapper.getDataInput();
for (int i = 0; i < numVertices; ++i) {
- readOutEdges(inputStream, partition);
+ readOutEdges(dataInput, partition);
}
- inputStream.close();
- numBytes += file.length();
// If the graph is static and it is not INPUT_SUPERSTEP, keep the file
// around.
+ boolean shouldDeleteEdges = false;
if (!conf.isStaticGraph() ||
oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
+ shouldDeleteEdges = true;
}
+ numBytes += inputWrapper.finalizeInput(shouldDeleteEdges);
+ index.removeLastIndex();
partitionStore.addPartition(partition);
}
return numBytes;
@@ -354,7 +297,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- protected void addEntryToImMemoryPartitionData(int partitionId,
+ protected void addEntryToInMemoryPartitionData(int partitionId,
ExtendedDataOutput vertices) {
if (!partitionStore.hasPartition(partitionId)) {
oocEngine.getMetaPartitionManager().addPartition(partitionId);
@@ -363,15 +306,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- public long loadPartitionData(int partitionId, String basePath)
+ public long loadPartitionData(int partitionId)
throws IOException {
- return super.loadPartitionData(partitionId, getPath(basePath));
+ return loadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
}
@Override
- public long offloadPartitionData(int partitionId, String basePath)
+ public long offloadPartitionData(int partitionId)
throws IOException {
- return super.offloadPartitionData(partitionId, getPath(basePath));
+ return offloadPartitionDataProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
}
/**
@@ -409,61 +354,44 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- protected long offloadInMemoryPartitionData(int partitionId, String path)
- throws IOException {
+ protected long offloadInMemoryPartitionData(
+ int partitionId, int ioThreadId, DataIndex index) throws IOException {
long numBytes = 0;
if (partitionStore.hasPartition(partitionId)) {
+ OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor();
partitionVertexCount.put(partitionId,
partitionStore.getPartitionVertexCount(partitionId));
partitionEdgeCount.put(partitionId,
partitionStore.getPartitionEdgeCount(partitionId));
Partition<I, V, E> partition =
partitionStore.removePartition(partitionId);
- File file = new File(getVerticesPath(path));
- if (LOG.isDebugEnabled()) {
- LOG.debug("offloadInMemoryPartitionData: writing partition vertices " +
- partitionId + " to " + file.getAbsolutePath());
- }
- checkState(!file.exists(), "offloadInMemoryPartitionData: partition " +
- "store file %s already exist", file.getAbsoluteFile());
- checkState(file.createNewFile(),
- "offloadInMemoryPartitionData: file %s already exists.",
- file.getAbsolutePath());
-
- FileOutputStream fileout = new FileOutputStream(file);
- BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
- DataOutputStream outputStream = new DataOutputStream(bufferout);
- outputStream.writeLong(partition.getVertexCount());
+ index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
+ OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
+ dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
+ DataOutput dataOutput = outputWrapper.getDataOutput();
+ dataOutput.writeLong(partition.getVertexCount());
for (Vertex<I, V, E> vertex : partition) {
- writeVertexData(outputStream, vertex);
+ writeVertexData(dataOutput, vertex);
}
- outputStream.close();
- numBytes += outputStream.size();
-
+ numBytes += outputWrapper.finalizeOutput();
+ index.removeLastIndex();
// Avoid writing back edges if we have already written them once and
// the graph is not changing.
// If we are in the input superstep, we need to write the files
// at least the first time, even though the graph is static.
- file = new File(getEdgesPath(path));
+ index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES);
if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
- partitionVertexCount.get(partitionId) == null ||
- partitionVertexCount.get(partitionId) != partition.getVertexCount() ||
- !conf.isStaticGraph() || !file.exists()) {
- checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " +
- "%s already exists.", file.getAbsolutePath());
- if (LOG.isDebugEnabled()) {
- LOG.debug("offloadInMemoryPartitionData: writing partition edges " +
- partitionId + " to " + file.getAbsolutePath());
- }
- fileout = new FileOutputStream(file);
- bufferout = new BufferedOutputStream(fileout);
- outputStream = new DataOutputStream(bufferout);
+ !conf.isStaticGraph() ||
+ !dataAccessor.dataExist(ioThreadId, index)) {
+ outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(),
+ false);
for (Vertex<I, V, E> vertex : partition) {
- writeOutEdges(outputStream, vertex);
+ writeOutEdges(outputWrapper.getDataOutput(), vertex);
}
- outputStream.close();
- numBytes += outputStream.size();
+ numBytes += outputWrapper.finalizeOutput();
}
+ index.removeLastIndex();
+ hasPartitionDataOnFile.add(partitionId);
}
return numBytes;
}
@@ -475,9 +403,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
@Override
- public long offloadBuffers(int partitionId, String basePath)
+ public long offloadBuffers(int partitionId)
throws IOException {
- return super.offloadBuffers(partitionId, getPath(basePath));
+ return offloadBuffersProxy(partitionId,
+ new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 1332a3a..64e3aed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -99,6 +99,21 @@ public class MetaPartitionManager {
*/
private final AtomicDouble lowestGraphFractionInMemory =
new AtomicDouble(1);
+ /**
+ * Map of partition ids to their indices. index of a partition is the order
+ * with which the partition has been inserted. Partitions are indexed as 0, 1,
+ * 2, etc. This indexing is later used to find the id of the IO thread who is
+ * responsible for handling a partition. Partitions are assigned to IO threads
+ * in a round-robin fashion based on their indices.
+ */
+ private final ConcurrentMap<Integer, Integer> partitionIndex =
+ Maps.newConcurrentMap();
+ /**
+ * Sequential counter used to assign indices to partitions as they are added
+ */
+ private final AtomicInteger indexCounter = new AtomicInteger(0);
+ /** How many disks (i.e. IO threads) do we have? */
+ private final int numIOThreads;
/**
* Constructor
@@ -117,6 +132,7 @@ public class MetaPartitionManager {
}
this.oocEngine = oocEngine;
this.randomGenerator = new Random();
+ this.numIOThreads = numIOThreads;
}
/**
@@ -131,7 +147,7 @@ public class MetaPartitionManager {
/**
* Get total number of partitions
*
- * @return total number of partition
+ * @return total number of partitions
*/
public int getNumPartitions() {
return partitions.size();
@@ -175,6 +191,18 @@ public class MetaPartitionManager {
}
/**
+ * Get the thread id that is responsible for a particular partition
+ *
+ * @param partitionId id of the given partition
+ * @return id of the thread responsible for the given partition
+ */
+ public int getOwnerThreadId(int partitionId) {
+ Integer index = partitionIndex.get(partitionId);
+ checkState(index != null);
+ return index % numIOThreads;
+ }
+
+ /**
* Add a partition
*
* @param partitionId id of a partition to add
@@ -184,8 +212,9 @@ public class MetaPartitionManager {
MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
// Check if the given partition is new
if (temp == null) {
- int ownerThread = oocEngine.getIOScheduler()
- .getOwnerThreadId(partitionId);
+ int index = indexCounter.getAndIncrement();
+ checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
+ int ownerThread = getOwnerThreadId(partitionId);
perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
numInMemoryPartitions.getAndIncrement();
}
@@ -199,7 +228,7 @@ public class MetaPartitionManager {
*/
public void removePartition(Integer partitionId) {
MetaPartition meta = partitions.remove(partitionId);
- int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int ownerThread = getOwnerThreadId(partitionId);
perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
checkState(!meta.isOnDisk());
numInMemoryPartitions.getAndDecrement();
@@ -424,7 +453,7 @@ public class MetaPartitionManager {
*/
public void markPartitionAsInProcess(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int ownerThread = getOwnerThreadId(partitionId);
synchronized (meta) {
perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setProcessingState(ProcessingState.IN_PROCESS);
@@ -468,7 +497,7 @@ public class MetaPartitionManager {
*/
public void setPartitionIsProcessed(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int ownerThread = getOwnerThreadId(partitionId);
synchronized (meta) {
perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setProcessingState(ProcessingState.PROCESSED);
@@ -508,7 +537,7 @@ public class MetaPartitionManager {
public void doneLoadingPartition(int partitionId, long superstep) {
MetaPartition meta = partitions.get(partitionId);
numInMemoryPartitions.getAndIncrement();
- int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int owner = getOwnerThreadId(partitionId);
synchronized (meta) {
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.setPartitionState(StorageState.IN_MEM);
@@ -535,8 +564,7 @@ public class MetaPartitionManager {
*/
public boolean startOffloadingMessages(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- int ownerThread =
- oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int ownerThread = getOwnerThreadId(partitionId);
synchronized (meta) {
if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
@@ -558,8 +586,7 @@ public class MetaPartitionManager {
*/
public void doneOffloadingMessages(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- int ownerThread =
- oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int ownerThread = getOwnerThreadId(partitionId);
synchronized (meta) {
perThreadPartitionDictionary.get(ownerThread).removePartition(meta);
meta.setIncomingMessagesState(StorageState.ON_DISK);
@@ -598,7 +625,7 @@ public class MetaPartitionManager {
*/
public boolean startOffloadingPartition(int partitionId) {
MetaPartition meta = partitions.get(partitionId);
- int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int owner = getOwnerThreadId(partitionId);
synchronized (meta) {
if (meta.getProcessingState() != ProcessingState.IN_PROCESS &&
(meta.getPartitionState() == StorageState.IN_MEM ||
@@ -624,7 +651,7 @@ public class MetaPartitionManager {
numInMemoryPartitions.getAndDecrement();
updateGraphFractionInMemory();
MetaPartition meta = partitions.get(partitionId);
- int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+ int owner = getOwnerThreadId(partitionId);
synchronized (meta) {
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.setPartitionState(StorageState.ON_DISK);
@@ -639,8 +666,7 @@ public class MetaPartitionManager {
*/
public void resetPartitions() {
for (MetaPartition meta : partitions.values()) {
- int owner =
- oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+ int owner = getOwnerThreadId(meta.getPartitionId());
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.resetPartition();
perThreadPartitionDictionary.get(owner).addPartition(meta);
@@ -659,8 +685,7 @@ public class MetaPartitionManager {
*/
public void resetMessages() {
for (MetaPartition meta : partitions.values()) {
- int owner =
- oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId());
+ int owner = getOwnerThreadId(meta.getPartitionId());
perThreadPartitionDictionary.get(owner).removePartition(meta);
meta.resetMessages();
if (meta.getPartitionState() == StorageState.IN_MEM &&
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
deleted file mode 100644
index 325850c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.data;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
-
-/**
- * This class provides basic operations for data structures that have to
- * participate in out-of-core mechanism. Essential subclasses of this class are:
- * - DiskBackedPartitionStore (for partition data)
- * - DiskBackedMessageStore (for messages)
- * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
- * Basically, any data structure that may cause OOM to happen can be implemented
- * as a subclass of this class.
- *
- * There are two different terms used in the rest of this class:
- * - "data store" refers to in-memory representation of data. Usually this is
- * stored per-partition in in-memory implementations of data structures. For
- * instance, "data store" of a DiskBackedPartitionStore would collection of
- * all partitions kept in the in-memory partition store within the
- * DiskBackedPartitionStore.
- * - "raw data buffer" refers to raw data which were supposed to be
- * de-serialized and added to the data store, but they remain 'as is' in the
- * memory because their corresponding partition is offloaded to disk and is
- * not available in the data store.
- *
- * @param <T> raw data format of the data store subclassing this class
- */
-public abstract class OutOfCoreDataManager<T> {
- /**
- * Minimum size of a buffer (in bytes) to flush to disk. This is used to
- * decide whether vertex/edge buffers are large enough to flush to disk.
- */
- public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
- new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
- "Minimum size of a buffer (in bytes) to flush to disk.");
-
- /** Class logger. */
- private static final Logger LOG = Logger.getLogger(
- OutOfCoreDataManager.class);
- /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
- private final int minBufferSizeToOffload;
- /** Set containing ids of all out-of-core partitions */
- private final Set<Integer> hasPartitionDataOnDisk =
- Sets.newConcurrentHashSet();
- /**
- * Map of partition ids to list of raw data buffers. The map will have entries
- * only for partitions that their in-memory data structures are currently
- * offloaded to disk. We keep the aggregate size of buffers for each partition
- * as part of the values in the map to estimate how much memory we can free up
- * if we offload data buffers of a particular partition to disk.
- */
- private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
- Maps.newConcurrentMap();
- /**
- * Map of partition ids to number of raw data buffers offloaded to disk for
- * each partition. The map will have entries only for partitions that their
- * in-memory data structures are currently out of core. It is necessary to
- * know the number of data buffers on disk for a particular partition when we
- * are loading all these buffers back in memory.
- */
- private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
- Maps.newConcurrentMap();
- /**
- * Lock to avoid overlapping of read and write on data associated with each
- * partition.
- * */
- private final ConcurrentMap<Integer, ReadWriteLock> locks =
- Maps.newConcurrentMap();
-
- /**
- * Constructor.
- *
- * @param conf Configuration
- */
- OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) {
- this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
- }
-
- /**
- * Retrieves a lock for a given partition. If the lock for the given partition
- * does not exist, creates a new lock.
- *
- * @param partitionId id of the partition the lock is needed for
- * @return lock for a given partition
- */
- private ReadWriteLock getPartitionLock(int partitionId) {
- ReadWriteLock readWriteLock = locks.get(partitionId);
- if (readWriteLock == null) {
- readWriteLock = new ReentrantReadWriteLock();
- ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
- if (temp != null) {
- readWriteLock = temp;
- }
- }
- return readWriteLock;
- }
-
- /**
- * Adds a data entry for a given partition to the current data store. If data
- * of a given partition in data store is already offloaded to disk, adds the
- * data entry to appropriate raw data buffer list.
- *
- * @param partitionId id of the partition to add the data entry to
- * @param entry data entry to add
- */
- protected void addEntry(int partitionId, T entry) {
- // Addition of data entries to a data store is much more common than
- // out-of-core operations. Besides, in-memory data store implementations
- // existing in the code base already account for parallel addition to data
- // stores. Therefore, using read lock would optimize for parallel addition
- // to data stores, specially for cases where the addition should happen for
- // partitions that are entirely in memory.
- ReadWriteLock rwLock = getPartitionLock(partitionId);
- rwLock.readLock().lock();
- if (hasPartitionDataOnDisk.contains(partitionId)) {
- List<T> entryList = new ArrayList<>();
- entryList.add(entry);
- int entrySize = entrySerializedSize(entry);
- MutablePair<Integer, List<T>> newPair =
- new MutablePair<>(entrySize, entryList);
- Pair<Integer, List<T>> oldPair =
- dataBuffers.putIfAbsent(partitionId, newPair);
- if (oldPair != null) {
- synchronized (oldPair) {
- newPair = (MutablePair<Integer, List<T>>) oldPair;
- newPair.setLeft(oldPair.getLeft() + entrySize);
- newPair.getRight().add(entry);
- }
- }
- } else {
- addEntryToImMemoryPartitionData(partitionId, entry);
- }
- rwLock.readLock().unlock();
- }
-
- /**
- * Loads and assembles all data for a given partition, and put it into the
- * data store. Returns the number of bytes transferred from disk to memory in
- * the loading process.
- *
- * @param partitionId id of the partition to load ana assemble all data for
- * @param basePath path to load the data from
- * @return number of bytes loaded from disk to memory
- * @throws IOException
- */
- public long loadPartitionData(int partitionId, String basePath)
- throws IOException {
- long numBytes = 0;
- ReadWriteLock rwLock = getPartitionLock(partitionId);
- rwLock.writeLock().lock();
- if (hasPartitionDataOnDisk.contains(partitionId)) {
- numBytes += loadInMemoryPartitionData(partitionId,
- getPath(basePath, partitionId));
- hasPartitionDataOnDisk.remove(partitionId);
- // Loading raw data buffers from disk if there is any and applying those
- // to already loaded in-memory data.
- Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
- if (numBuffers != null) {
- checkState(numBuffers > 0);
- File file = new File(getBuffersPath(basePath, partitionId));
- checkState(file.exists());
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" +
- " partition " + partitionId + " from " + file.getAbsolutePath());
- }
- FileInputStream fis = new FileInputStream(file);
- BufferedInputStream bis = new BufferedInputStream(fis);
- DataInputStream dis = new DataInputStream(bis);
- for (int i = 0; i < numBuffers; ++i) {
- T entry = readNextEntry(dis);
- addEntryToImMemoryPartitionData(partitionId, entry);
- }
- dis.close();
- numBytes += file.length();
- checkState(file.delete(), "loadPartitionData: failed to delete %s.",
- file.getAbsoluteFile());
- }
- // Applying in-memory raw data buffers to in-memory partition data.
- Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
- if (pair != null) {
- for (T entry : pair.getValue()) {
- addEntryToImMemoryPartitionData(partitionId, entry);
- }
- }
- }
- rwLock.writeLock().unlock();
- return numBytes;
- }
-
- /**
- * Offloads partition data of a given partition in the data store to disk, and
- * returns the number of bytes offloaded from memory to disk.
- *
- * @param partitionId id of the partition to offload its data
- * @param basePath path to offload the data to
- * @return number of bytes offloaded from memory to disk
- * @throws IOException
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
- public long offloadPartitionData(int partitionId, String basePath)
- throws IOException {
- ReadWriteLock rwLock = getPartitionLock(partitionId);
- rwLock.writeLock().lock();
- hasPartitionDataOnDisk.add(partitionId);
- rwLock.writeLock().unlock();
- return offloadInMemoryPartitionData(partitionId,
- getPath(basePath, partitionId));
- }
-
- /**
- * Offloads raw data buffers of a given partition to disk, and returns the
- * number of bytes offloaded from memory to disk.
- *
- * @param partitionId id of the partition to offload its raw data buffers
- * @param basePath path to offload the data to
- * @return number of bytes offloaded from memory to disk
- * @throws IOException
- */
- public long offloadBuffers(int partitionId, String basePath)
- throws IOException {
- Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
- if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
- return 0;
- }
- ReadWriteLock rwLock = getPartitionLock(partitionId);
- rwLock.writeLock().lock();
- pair = dataBuffers.remove(partitionId);
- rwLock.writeLock().unlock();
- checkNotNull(pair);
- checkState(!pair.getRight().isEmpty());
- File file = new File(getBuffersPath(basePath, partitionId));
- FileOutputStream fos = new FileOutputStream(file, true);
- BufferedOutputStream bos = new BufferedOutputStream(fos);
- DataOutputStream dos = new DataOutputStream(bos);
- for (T entry : pair.getRight()) {
- writeEntry(entry, dos);
- }
- dos.close();
- long numBytes = dos.size();
- int numBuffers = pair.getRight().size();
- Integer oldNumBuffersOnDisk =
- numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
- if (oldNumBuffersOnDisk != null) {
- numDataBuffersOnDisk.replace(partitionId,
- oldNumBuffersOnDisk + numBuffers);
- }
- return numBytes;
- }
-
- /**
- * Looks through all partitions that their data is not in the data store (is
- * offloaded to disk), and sees if any of them has enough raw data buffer in
- * memory. If so, puts that partition in a list to return.
- *
- * @return Set of partition ids of all partition raw buffers where the
- * aggregate size of buffers are large enough and it is worth flushing
- * those buffers to disk
- */
- public Set<Integer> getCandidateBuffersToOffload() {
- Set<Integer> result = new HashSet<>();
- for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
- dataBuffers.entrySet()) {
- if (entry.getValue().getLeft() > minBufferSizeToOffload) {
- result.add(entry.getKey());
- }
- }
- return result;
- }
-
- /**
- * Creates the path to read/write partition data from/to for a given
- * partition.
- *
- * @param basePath path prefix to create the actual path from
- * @param partitionId id of the partition
- * @return path to read/write data from/to
- */
- private static String getPath(String basePath, int partitionId) {
- return basePath + "-P" + partitionId;
- }
-
- /**
- * Creates the path to read/write raw data buffers of a given partition
- * from/to.
- *
- * @param basePath path prefix to create the actual path from
- * @param partitionId id of the partition
- * @return path to read/write raw data buffer to/from
- */
- private static String getBuffersPath(String basePath, int partitionId) {
- return getPath(basePath, partitionId) + "_buffers";
- }
-
- /**
- * Writes a single raw entry to a given output stream.
- *
- * @param entry entry to write to output
- * @param out output stream to write the entry to
- * @throws IOException
- */
- protected abstract void writeEntry(T entry, DataOutput out)
- throws IOException;
-
- /**
- * Reads the next available raw entry from a given input stream.
- *
- * @param in input stream to read the entry from
- * @return entry read from an input stream
- * @throws IOException
- */
- protected abstract T readNextEntry(DataInput in) throws IOException;
-
- /**
- * Loads data of a partition into data store. Returns number of bytes loaded.
- *
- * @param partitionId id of the partition to load its data
- * @param path path from which data should be loaded
- * @return number of bytes loaded from disk to memory
- * @throws IOException
- */
- protected abstract long loadInMemoryPartitionData(int partitionId,
- String path)
- throws IOException;
-
- /**
- * Offloads data of a partition in data store to disk. Returns the number of
- * bytes offloaded to disk
- *
- * @param partitionId id of the partition to offload to disk
- * @param path path to which data should be offloaded
- * @return number of bytes offloaded from memory to disk
- * @throws IOException
- */
- protected abstract long offloadInMemoryPartitionData(int partitionId,
- String path)
- throws IOException;
-
- /**
- * Gets the size of a given entry in bytes.
- *
- * @param entry input entry to find its size
- * @return size of given input entry in bytes
- */
- protected abstract int entrySerializedSize(T entry);
-
- /**
- * Adds a single entry for a given partition to the in-memory data store.
- *
- * @param partitionId id of the partition to add the data to
- * @param entry input entry to add to the data store
- */
- protected abstract void addEntryToImMemoryPartitionData(int partitionId,
- T entry);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
deleted file mode 100644
index e84ad29..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-
-/**
- * Representation of an IO command (moving data to disk/memory) used in
- * out-of-core mechanism.
- */
-public abstract class IOCommand {
- /** Type of IO command */
- public enum IOCommandType {
- /** Loading a partition */
- LOAD_PARTITION,
- /** Storing a partition */
- STORE_PARTITION,
- /** Storing incoming messages of a partition */
- STORE_MESSAGE,
- /**
- * Storing message/buffer raw data buffer of a currently out-of-core
- * partition
- */
- STORE_BUFFER,
- /** Doing nothing regarding IO */
- WAIT
- }
-
- /** Id of the partition involved for the IO */
- protected final int partitionId;
- /** Out-of-core engine */
- protected final OutOfCoreEngine oocEngine;
- /**
- * Number of bytes transferred to/from memory (loaded/stored) during the
- * execution of the command
- */
- protected long numBytesTransferred;
-
- /**
- * Constructor
- *
- * @param oocEngine Out-of-core engine
- * @param partitionId Id of the partition involved in the IO
- */
- public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
- this.oocEngine = oocEngine;
- this.partitionId = partitionId;
- this.numBytesTransferred = 0;
- }
-
- /**
- * Get the id of the partition involved in the IO
- *
- * @return id of the partition
- */
- public int getPartitionId() {
- return partitionId;
- }
-
- /**
- * Execute (load/store of data) the IO command, and change the data stores
- * appropriately based on the data loaded/stored. Return true iff the command
- * is actually executed (resulted in loading or storing data).
- *
- * @param basePath the base path (prefix) to the files/folders IO command
- * should read/write data from/to
- * @return whether the command is actually executed
- * @throws IOException
- */
- public abstract boolean execute(String basePath) throws IOException;
-
- /**
- * Get the type of the command.
- *
- * @return type of the command
- */
- public abstract IOCommandType getType();
-
- /**
- * Get the number of bytes transferred (loaded/stored from/to disk).
- *
- * @return number of bytes transferred during the execution of the command
- */
- public long bytesTransferred() {
- return numBytesTransferred;
- }
-}
-
[2/4] git commit: updated refs/heads/trunk to 3793c9e
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
deleted file mode 100644
index ce24fe2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-
-import java.io.IOException;
-
-/**
- * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
- * message data (if in compute supersteps). Also, this command can be used to
- * prefetch a partition to be processed in the next superstep.
- */
-public class LoadPartitionIOCommand extends IOCommand {
- /**
- * Which superstep this partition should be loaded for? (can be current
- * superstep or next superstep -- in case of prefetching).
- */
- private final long superstep;
-
- /**
- * Constructor
- *
- * @param oocEngine out-of-core engine
- * @param partitionId id of the partition to be loaded
- * @param superstep superstep to load the partition for
- */
- public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
- long superstep) {
- super(oocEngine, partitionId);
- this.superstep = superstep;
- }
-
- @Override
- public boolean execute(String basePath) throws IOException {
- boolean executed = false;
- if (oocEngine.getMetaPartitionManager()
- .startLoadingPartition(partitionId, superstep)) {
- long currentSuperstep = oocEngine.getSuperstep();
- DiskBackedPartitionStore partitionStore =
- (DiskBackedPartitionStore)
- oocEngine.getServerData().getPartitionStore();
- numBytesTransferred +=
- partitionStore.loadPartitionData(partitionId, basePath);
- if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
- superstep == currentSuperstep) {
- DiskBackedEdgeStore edgeStore =
- (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
- numBytesTransferred +=
- edgeStore.loadPartitionData(partitionId, basePath);
- }
- MessageStore messageStore;
- if (currentSuperstep == superstep) {
- messageStore = oocEngine.getServerData().getCurrentMessageStore();
- } else {
- Preconditions.checkState(superstep == currentSuperstep + 1);
- messageStore = oocEngine.getServerData().getIncomingMessageStore();
- }
- if (messageStore != null) {
- numBytesTransferred += ((DiskBackedMessageStore) messageStore)
- .loadPartitionData(partitionId, basePath);
- }
- oocEngine.getMetaPartitionManager()
- .doneLoadingPartition(partitionId, superstep);
- executed = true;
- }
- return executed;
- }
-
- @Override
- public IOCommandType getType() {
- return IOCommandType.LOAD_PARTITION;
- }
-
- @Override
- public String toString() {
- return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
- "superstep = " + superstep + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
deleted file mode 100644
index f1769dd..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-
-import java.io.IOException;
-
-/**
- * IOCommand to store raw data buffers on disk.
- */
-public class StoreDataBufferIOCommand extends IOCommand {
- /**
- * Types of raw data buffer to offload to disk (either vertices/edges buffer
- * in INPUT_SUPERSTEP or incoming message buffer).
- */
- public enum DataBufferType { PARTITION, MESSAGE };
- /**
- * Type of the buffer to store on disk.
- */
- private final DataBufferType type;
-
- /**
- * Constructor
- *
- * @param oocEngine out-of-core engine
- * @param partitionId id of the partition to offload its buffers
- * @param type type of the buffer to store on disk
- */
- public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
- int partitionId,
- DataBufferType type) {
- super(oocEngine, partitionId);
- this.type = type;
- }
-
- @Override
- public boolean execute(String basePath) throws IOException {
- boolean executed = false;
- if (oocEngine.getMetaPartitionManager()
- .startOffloadingBuffer(partitionId)) {
- switch (type) {
- case PARTITION:
- DiskBackedPartitionStore partitionStore =
- (DiskBackedPartitionStore)
- oocEngine.getServerData().getPartitionStore();
- numBytesTransferred +=
- partitionStore.offloadBuffers(partitionId, basePath);
- DiskBackedEdgeStore edgeStore =
- (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
- numBytesTransferred += edgeStore.offloadBuffers(partitionId, basePath);
- break;
- case MESSAGE:
- DiskBackedMessageStore messageStore =
- (DiskBackedMessageStore)
- oocEngine.getServerData().getIncomingMessageStore();
- numBytesTransferred +=
- messageStore.offloadBuffers(partitionId, basePath);
- break;
- default:
- throw new IllegalStateException("execute: requested data buffer type " +
- "does not exist!");
- }
- oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
- executed = true;
- }
- return executed;
- }
-
- @Override
- public IOCommandType getType() {
- return IOCommandType.STORE_BUFFER;
- }
-
- @Override
- public String toString() {
- return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
- "type = " + type.name() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
deleted file mode 100644
index c9d8829..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-
-import java.io.IOException;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * IOCommand to store incoming message of a particular partition.
- */
-public class StoreIncomingMessageIOCommand extends IOCommand {
- /**
- * Constructor
- *
- * @param oocEngine out-of-core engine
- * @param partitionId id of the partition to store its incoming messages
- */
- public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
- int partitionId) {
- super(oocEngine, partitionId);
- }
-
- @Override
- public boolean execute(String basePath) throws IOException {
- boolean executed = false;
- if (oocEngine.getMetaPartitionManager()
- .startOffloadingMessages(partitionId)) {
- DiskBackedMessageStore messageStore =
- (DiskBackedMessageStore)
- oocEngine.getServerData().getIncomingMessageStore();
- checkState(messageStore != null);
- numBytesTransferred +=
- messageStore.offloadPartitionData(partitionId, basePath);
- oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
- executed = true;
- }
- return executed;
- }
-
- @Override
- public IOCommandType getType() {
- return IOCommandType.STORE_MESSAGE;
- }
-
- @Override
- public String toString() {
- return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
deleted file mode 100644
index 797ac9d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
-import org.apache.giraph.ooc.data.DiskBackedMessageStore;
-import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-
-/**
- * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
- * message data (if in compute supersteps).
- */
-public class StorePartitionIOCommand extends IOCommand {
- /**
- * Constructor
- *
- * @param oocEngine out-of-core engine
- * @param partitionId id of the partition to store its data
- */
- public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
- int partitionId) {
- super(oocEngine, partitionId);
- }
-
- @Override
- public boolean execute(String basePath) throws IOException {
- boolean executed = false;
- if (oocEngine.getMetaPartitionManager()
- .startOffloadingPartition(partitionId)) {
- DiskBackedPartitionStore partitionStore =
- (DiskBackedPartitionStore)
- oocEngine.getServerData().getPartitionStore();
- numBytesTransferred +=
- partitionStore.offloadPartitionData(partitionId, basePath);
- if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) {
- MessageStore messageStore =
- oocEngine.getServerData().getCurrentMessageStore();
- if (messageStore != null) {
- numBytesTransferred += ((DiskBackedMessageStore) messageStore)
- .offloadPartitionData(partitionId, basePath);
- }
- } else {
- DiskBackedEdgeStore edgeStore =
- (DiskBackedEdgeStore)
- oocEngine.getServerData().getEdgeStore();
- numBytesTransferred +=
- edgeStore.offloadPartitionData(partitionId, basePath);
- }
- oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
- executed = true;
- }
- return executed;
- }
-
- @Override
- public IOCommandType getType() {
- return IOCommandType.STORE_PARTITION;
- }
-
- @Override
- public String toString() {
- return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
deleted file mode 100644
index 74e72eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc.io;
-
-import org.apache.giraph.ooc.OutOfCoreEngine;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * IOCommand to do nothing regarding moving data to/from disk.
- */
-public class WaitIOCommand extends IOCommand {
- /** How long should the disk be idle? (in milliseconds) */
- private final long waitDuration;
-
- /**
- * Constructor
- *
- * @param oocEngine out-of-core engine
- * @param waitDuration duration of wait
- */
- public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
- super(oocEngine, -1);
- this.waitDuration = waitDuration;
- }
-
- @Override
- public boolean execute(String basePath) throws IOException {
- try {
- TimeUnit.MILLISECONDS.sleep(waitDuration);
- } catch (InterruptedException e) {
- throw new IllegalStateException("execute: caught InterruptedException " +
- "while IO thread is waiting!");
- }
- return true;
- }
-
- @Override
- public IOCommandType getType() {
- return IOCommandType.WAIT;
- }
-
- @Override
- public String toString() {
- return "WaitIOCommand: (duration = " + waitDuration + "ms)";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
deleted file mode 100644
index 2230ec4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of classes related to IO operations in out-of-core mechanism
- */
-package org.apache.giraph.ooc.io;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
new file mode 100644
index 0000000..d44204b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/DataIndex.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Index chain used in out-of-core data accessor object (DAO) to access
+ * serialized data.
+ */
+public class DataIndex {
+ /** Chain of data indices */
+ private final List<DataIndexEntry> indexList = new ArrayList<>(5);
+
+ /**
+ * Add an index to the index chain
+ *
+ * @param entry the entry to add to the chain
+ * @return the index chain itself
+ */
+ public DataIndex addIndex(DataIndexEntry entry) {
+ indexList.add(entry);
+ return this;
+ }
+
+ /**
+ * Remove/Pop the last index in the index chain
+ *
+ * @return the index chain itself
+ */
+ public DataIndex removeLastIndex() {
+ indexList.remove(indexList.size() - 1);
+ return this;
+ }
+
+ /**
+ * Create a copy of the existing DataIndex
+ *
+ * @return a copy of the existing index chain
+ */
+ public DataIndex copy() {
+ DataIndex index = new DataIndex();
+ for (DataIndexEntry entry : indexList) {
+ index.indexList.add(entry);
+ }
+ return index;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof DataIndex)) {
+ return false;
+ }
+ DataIndex dataIndex = (DataIndex) obj;
+ return indexList.equals(dataIndex.indexList);
+ }
+
+ @Override
+ public int hashCode() {
+ return indexList.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (DataIndexEntry entry : indexList) {
+ sb.append(entry);
+ }
+ return sb.toString();
+ }
+
+ /** Interface to unify different types of entries used as index chain */
+ public interface DataIndexEntry { }
+
+ /**
+ * Different static types of index chain entry
+ */
+ public enum TypeIndexEntry implements DataIndexEntry {
+ /** The whole partition */
+ PARTITION("_partition"),
+ /** Partition vertices */
+ PARTITION_VERTICES("_vertices"),
+ /** Partition edges */
+ PARTITION_EDGES("_edges"),
+ /** Partition messages */
+ MESSAGE("_messages"),
+ /** Edges stored in edge store for a partition */
+ EDGE_STORE("_edge_store"),
+ /** Raw data buffer (refer to DiskBackedDataStore) */
+ BUFFER("_buffer");
+
+ /** String realization of entry type */
+ private final String name;
+
+ /**
+ * Constructor
+ *
+ * @param name name of the type
+ */
+ TypeIndexEntry(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * Class representing any index chain that depends on something with id.
+ * Generally this is used for identifying indices in two types:
+ * - Index entry based on superstep id ('S' and the superstep number)
+ * - Index entry based on partition id ('P' and the partition id)
+ */
+ public static final class NumericIndexEntry implements DataIndexEntry {
+ /** Type of index */
+ private final char type;
+ /** Id of the index associated with the specified type */
+ private final long id;
+
+ /**
+ * Constructor
+ *
+ * @param type type of index (for now 'S' for superstep, or 'P' for
+ * partition)
+ * @param id id of the index associated with the given type
+ */
+ private NumericIndexEntry(char type, long id) {
+ this.type = type;
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof NumericIndexEntry)) {
+ return false;
+ }
+ NumericIndexEntry index = (NumericIndexEntry) obj;
+ return index.type == type && index.id == id;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = result * 37 + type;
+ result = result * 37 + (int) id;
+ result = result * 37 + (int) (id >> 32);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("_%c%d", type, id);
+ }
+
+ /**
+ * Create a data index entry for a given partition
+ *
+ * @param partitionId id of the partition
+ * @return data index entry for a given partition
+ */
+ public static NumericIndexEntry createPartitionEntry(int partitionId) {
+ return new NumericIndexEntry('P', partitionId);
+ }
+
+ /**
+ * Create a data index entry for a given superstep
+ *
+ * @param superstep the superstep number
+ * @return data index entry for a given superstep
+ */
+ public static NumericIndexEntry createSuperstepEntry(long superstep) {
+ return new NumericIndexEntry('S', superstep);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
new file mode 100644
index 0000000..2e42906
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.KryoDataInput;
+import com.esotericsoftware.kryo.io.KryoDataOutput;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.UnsafeInput;
+import com.esotericsoftware.kryo.io.UnsafeOutput;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * Data accessor object to read/write data in local disk.
+ * Note: This class assumes that the data are partitioned across IO threads,
+ * i.e. each part of data can be accessed by one and only one IO thread
+ * throughout the execution. Also, each IO thread reads a particular
+ * type of data completely and, only then, it can read other type of data;
+ * i.e. an IO thread cannot be used to read two different files at the
+ * same time. These assumptions are based on the assumptions that the
+ * current out-of-core mechanism is designed for.
+ */
+public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
+ /**
+ * Size of the buffer used for (de)serializing data when reading/writing
+ * from/to disk
+ */
+ public static final IntConfOption OOC_DISK_BUFFER_SIZE =
+ new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
+ "size of the buffer when (de)serializing data for reading/writing " +
+ "from/to disk");
+
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(LocalDiskDataAccessor.class);
+ /**
+ * In-memory buffer used for (de)serializing data when reading/writing
+ * from/to disk using Kryo
+ */
+ private final byte[][] perThreadBuffers;
+ /** Path prefix for different disks */
+ private final String[] basePaths;
+ /** How many disks (i.e. IO threads) do we have? */
+ private final int numDisks;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public LocalDiskDataAccessor(
+ ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
+ // Take advantage of multiple disks
+ String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
+ this.numDisks = userPaths.length;
+ if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
+ GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
+ LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
+ "out-of-core threads is only specified by the number of " +
+ "directories given by 'giraph.partitionsDirectory' flag! Now using " +
+ numDisks + " IO threads!");
+ }
+ this.basePaths = new String[numDisks];
+ int ptr = 0;
+ String jobId = conf.getJobId();
+ for (String path : userPaths) {
+ File file = new File(path);
+ if (!file.exists()) {
+ checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
+ "directory " + file.getAbsolutePath());
+ }
+ basePaths[ptr] = path + "/" + jobId;
+ ptr++;
+ }
+ final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
+ this.perThreadBuffers = new byte[numDisks][diskBufferSize];
+ }
+
+ @Override
+ public void initialize() { }
+
+ @Override
+ public void shutdown() {
+ for (String path : basePaths) {
+ File file = new File(path).getParentFile();
+ for (String subFileName : file.list()) {
+ File subFile = new File(file.getPath(), subFileName);
+ checkState(subFile.delete(), "shutdown: cannot delete file %s",
+ subFile.getAbsoluteFile());
+ }
+ checkState(file.delete(), "shutdown: cannot delete directory %s",
+ file.getAbsoluteFile());
+ }
+ }
+
+ @Override
+ public int getNumAccessorThreads() {
+ return numDisks;
+ }
+
+ @Override
+ public DataInputWrapper prepareInput(int threadId, DataIndex index)
+ throws IOException {
+ return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
+ perThreadBuffers[threadId]);
+ }
+
+ @Override
+ public DataOutputWrapper prepareOutput(
+ int threadId, DataIndex index, boolean shouldAppend) throws IOException {
+ return new LocalDiskDataOutputWrapper(
+ basePaths[threadId] + index.toString(), shouldAppend,
+ perThreadBuffers[threadId]);
+ }
+
+ @Override
+ public boolean dataExist(int threadId, DataIndex index) {
+ return new File(basePaths[threadId] + index.toString()).exists();
+ }
+
+ /** Implementation of <code>DataInput</code> wrapper for local disk reader */
+ private static class LocalDiskDataInputWrapper implements DataInputWrapper {
+ /** File used to read the data from */
+ private final File file;
+ /** Kryo's handle to read the data */
+ private final Input input;
+
+ /**
+ * Constructor
+ *
+ * @param fileName file name
+ * @param buffer reusable byte buffer that will be used in Kryo's Input
+ * reader
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ "OBL_UNSATISFIED_OBLIGATION")
+ LocalDiskDataInputWrapper(String fileName, byte[] buffer)
+ throws IOException {
+ file = new File(fileName);
+ LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " +
+ "file " + file.getAbsolutePath());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
+ "local file " + file.getAbsolutePath());
+ }
+ input = new UnsafeInput(buffer);
+ input.setInputStream(new FileInputStream(
+ new RandomAccessFile(file, "r").getFD()));
+ }
+
+ @Override
+ public DataInput getDataInput() {
+ return new KryoDataInput(input);
+ }
+
+ @Override
+ public long finalizeInput(boolean deleteOnClose) {
+ input.close();
+ long count = input.total();
+ checkState(!deleteOnClose || file.delete(),
+ "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
+ return count;
+ }
+ }
+
+ /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
+ private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
+ /** File used to write the data to */
+ private final File file;
+ /** Kryo's handle to write the date */
+ private final Output output;
+
+ /**
+ * Constructor
+ *
+ * @param fileName file name
+ * @param shouldAppend whether the <code>DataOutput</code> should be used
+ * for appending to already existing files
+ * @param buffer reusable byte buffer that will be used in Kryo's Output
+ * writer
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ "OBL_UNSATISFIED_OBLIGATION")
+ LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
+ byte[] buffer) throws IOException {
+ file = new File(fileName);
+ LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " +
+ "local file " + file.getAbsolutePath());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
+ "local file " + file.getAbsolutePath());
+ if (!shouldAppend) {
+ checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
+ "already exist", file.getAbsoluteFile());
+ checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
+ "cannot create file %s", file.getAbsolutePath());
+ }
+ }
+ output = new UnsafeOutput(buffer);
+ RandomAccessFile raf = new RandomAccessFile(file, "rw");
+ if (shouldAppend) {
+ raf.seek(file.length());
+ }
+ output.setOutputStream(new FileOutputStream(raf.getFD()));
+ }
+
+ @Override
+ public DataOutput getDataOutput() {
+ return new KryoDataOutput(output);
+ }
+
+
+ @Override
+ public long finalizeOutput() {
+ output.close();
+ long count = output.total();
+ return count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
new file mode 100644
index 0000000..d4ddc62
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/OutOfCoreDataAccessor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.persistence;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Interface representing data accessor object (DAO) used as persistence layer
+ * in out-of-core mechanism.
+ * Note: any class implementing this interface should have one and only one
+ * constructor taking one and only one argument of type
+ * <code>ImmutableClassesGiraphConfiguration</code>
+ */
+public interface OutOfCoreDataAccessor {
+ /** Initialize the DAO */
+ void initialize();
+
+ /** Shut down the DAO */
+ void shutdown();
+
+ /**
+ * @return the number of threads involved in data persistence
+ */
+ int getNumAccessorThreads();
+
+ /**
+ * Prepare a wrapper containing <code>DataInput</code> representation for a
+ * given thread involved in persistence for a given index chain for data.
+ *
+ * @param threadId id of the thread involved in persistence
+ * @param index index chain of the data to access the serialized data form
+ * @return the wrapper for <code>DataInput</code> representation of data
+ * @throws IOException
+ */
+ DataInputWrapper prepareInput(int threadId, DataIndex index)
+ throws IOException;
+
+ /**
+ * Prepare a wrapper containing <code>DataOutput</code> representation for a
+ * given thread involved in persistence for a given index chain for data.
+ *
+ * @param threadId id of the thread involved in persistence
+ * @param index index chain of the data to access the serialized data form
+ * @param shouldAppend whether the <code>DataOutput</code> should be used for
+ * appending to already existing data for the given index
+ * or the <code>DataOutput</code> should create new
+ * instance to store serialized data
+ * @return the wrapper for <code>DataOutput</code> representation of data
+ * @throws IOException
+ */
+ DataOutputWrapper prepareOutput(int threadId, DataIndex index,
+ boolean shouldAppend) throws IOException;
+
+ /**
+ * Whether the data for the given thread and index chain exists?
+ *
+ * @param threadId id of the thread involved in persistence
+ * @param index index chain used to access the data
+ * @return True if the data exists for the given index chain for the given
+ * thread, False otherwise
+ */
+ boolean dataExist(int threadId, DataIndex index);
+
+ /** Interface to wrap <code>DataInput</code> */
+ interface DataInputWrapper {
+ /**
+ * @return the <code>DataInput</code>
+ */
+ DataInput getDataInput();
+
+ /**
+ * Finalize and close the <code>DataInput</code> used for persistence.
+ *
+ * @param deleteOnClose whether the source of <code>DataInput</code>
+ * should be deleted on closing/finalizing
+ * @return number of bytes read from <code>DataInput</code> since it was
+ * opened
+ */
+ long finalizeInput(boolean deleteOnClose);
+ }
+
+ /** Interface to warp <code>DataOutput</code> */
+ interface DataOutputWrapper {
+ /**
+ * @return the <code>DataOutput</code>
+ */
+ DataOutput getDataOutput();
+
+ /**
+ * Finalize and close the <code>DataOutput</code> used for persistence.
+ *
+ * @return number of bytes written to <code>DataOutput</code> since it was
+ * opened
+ */
+ long finalizeOutput();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
new file mode 100644
index 0000000..adf8dba
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO abstraction or persistence layer used for
+ * out-of-core mechanism
+ */
+package org.apache.giraph.ooc.persistence;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
new file mode 100644
index 0000000..ffc5f7f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.StorePartitionIOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Oracle for fixed out-of-core mechanism */
+public class FixedPartitionsOracle implements OutOfCoreOracle {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(FixedPartitionsOracle.class);
+ /** Maximum number of partitions to be kept in memory */
+ private final int maxPartitionsInMemory;
+ /**
+ * Number of partitions to be added (loaded) or removed (stored) to/from
+ * memory. Each outstanding load partition counts +1 and each outstanding
+ * store partition counts -1 toward this counter.
+ */
+ private final AtomicInteger deltaNumPartitionsInMemory =
+ new AtomicInteger(0);
+ /** Out-of-core engine */
+ private final OutOfCoreEngine oocEngine;
+
+ /**
+ * Constructor
+ *
+ * @param conf configuration
+ * @param oocEngine out-of-core engine
+ */
+ public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
+ OutOfCoreEngine oocEngine) {
+ this.maxPartitionsInMemory =
+ GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
+ this.oocEngine = oocEngine;
+ }
+
+ @Override
+ public IOAction[] getNextIOActions() {
+ int numPartitionsInMemory =
+ oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
+ " partitions in memory, " + deltaNumPartitionsInMemory.get() +
+ " to be loaded");
+ }
+ int numPartitions =
+ numPartitionsInMemory + deltaNumPartitionsInMemory.get();
+ // Fixed out-of-core policy:
+ // - if the number of partitions in memory is less than the max number of
+ // partitions in memory, we should load a partition to memory. This
+ // basically means we are prefetching partition's data either for the
+ // current superstep, or for the next superstep.
+ // - if the number of partitions in memory is equal to the the max number
+ // of partitions in memory, we do a 'soft store', meaning, we store
+ // processed partition to disk only if there is an unprocessed partition
+ // on disk. This basically makes room for unprocessed partitions on disk
+ // to be prefetched.
+ // - if the number of partitions in memory is more than the max number of
+ // partitions in memory, we do a 'hard store', meaning we store a
+ // partition to disk, regardless of its processing state.
+ if (numPartitions < maxPartitionsInMemory) {
+ return new IOAction[]{
+ IOAction.LOAD_PARTITION,
+ IOAction.STORE_MESSAGES_AND_BUFFERS};
+ } else if (numPartitions > maxPartitionsInMemory) {
+ LOG.warn("getNextIOActions: number of partitions in memory passed the " +
+ "specified threshold!");
+ return new IOAction[]{
+ IOAction.STORE_PARTITION,
+ IOAction.STORE_MESSAGES_AND_BUFFERS};
+ } else {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.LOAD_TO_SWAP_PARTITION};
+ }
+ }
+
+ @Override
+ public boolean approve(IOCommand command) {
+ int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
+ .getNumInMemoryPartitions();
+ // If loading a partition result in having more partition in memory, the
+ // command should be denied. Also, if number of partitions in memory is
+ // already less than the max number of partitions, any command for storing
+ // a partition should be denied.
+ if (command instanceof LoadPartitionIOCommand &&
+ numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
+ maxPartitionsInMemory) {
+ deltaNumPartitionsInMemory.getAndDecrement();
+ return false;
+
+ } else if (command instanceof StorePartitionIOCommand &&
+ numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
+ maxPartitionsInMemory) {
+ deltaNumPartitionsInMemory.getAndIncrement();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void commandCompleted(IOCommand command) {
+ if (command instanceof LoadPartitionIOCommand) {
+ deltaNumPartitionsInMemory.getAndDecrement();
+ } else if (command instanceof StorePartitionIOCommand) {
+ deltaNumPartitionsInMemory.getAndIncrement();
+ }
+ }
+
+ @Override
+ public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
+
+ @Override
+ public void shutdown() { }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
new file mode 100644
index 0000000..45b9914
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.ooc.command.IOCommand;
+
+/**
+ * Interface for any out-of-core oracle. An out-of-core oracle is the brain of
+ * the out-of-core mechanism, determining/deciding on out-of-core actions (load
+ * or store) that should happen.
+ * Note: any class implementing this interface should have one and only one
+ * constructor taking only two arguments of types
+ * <code>ImmutableClassesGiraphConfiguration</code> and
+ * <code>OutOfCoreEngine</code>
+ */
+public interface OutOfCoreOracle {
+ /**
+ * Different types of IO actions that can potentially lead to a more desired
+ * state of computation for out-of-core mechanism. These actions are issued
+ * based on the status of the memory (memory pressure, rate of data transfer
+ * to memory, etc.)
+ */
+ enum IOAction {
+ /**
+ * Either of:
+ * - storing incoming messages of any partition currently on disk, or
+ * - storing incoming messages' raw data buffer of any partition
+ * currently on disk, or
+ * - storing partitions' raw data buffer for those partitions that are
+ * currently on disk.
+ */
+ STORE_MESSAGES_AND_BUFFERS,
+ /**
+ * Storing a partition that is *processed* in the current iteration cycle.
+ * This action is also known as "soft store"
+ */
+ STORE_PROCESSED_PARTITION,
+ /**
+ * Storing a partition from memory on disk, prioritizing to *processed*
+ * partitions on memory. However, if there is no *processed* partition,
+ * store should happen at any cost, even if an *unprocessed* partition has
+ * to be stored. This action is also know as "hard store".
+ */
+ STORE_PARTITION,
+ /**
+ * Loading an *unprocessed* partition from disk to memory, only if there are
+ * *processed* partitions in memory. This action basically initiates a swap
+ * operation.
+ */
+ LOAD_TO_SWAP_PARTITION,
+ /**
+ * Loading an *unprocessed* partition from disk to memory. This action is
+ * also known as "soft load".
+ */
+ LOAD_UNPROCESSED_PARTITION,
+ /**
+ * Loading a partition (prioritizing *unprocessed* over *processed*) from
+ * disk to memory. Loading a *processed* partition to memory is a prefetch
+ * of that partition to be processed in the next superstep. This action is
+ * also known as "hard load".
+ */
+ LOAD_PARTITION,
+ /**
+ * Loading a partition regardless of the memory situation. An out-of-core
+ * mechanism may use this action to signal IO threads that it is allowed to
+ * load a partition that is specifically requested.
+ */
+ URGENT_LOAD_PARTITION
+ }
+
+ /**
+ * Get the next set of viable IO actions to help bring memory to a more
+ * desired state.
+ *
+ * @return an array of viable IO actions, sorted from highest priority to
+ * lowest priority
+ */
+ IOAction[] getNextIOActions();
+
+ /**
+ * Whether a command is appropriate to bring the memory to a more desired
+ * state. A command is not executed unless it is approved by the oracle. This
+ * method is specially important where there are multiple IO threads
+ * performing IO operations for the out-of-core mechanism. The approval
+ * becomes significantly important to prevent all IO threads from performing
+ * identical command type, if that is a necessity. For instance, execution of
+ * a particular command type by only one thread may bring the memory to a
+ * desired state, and the rest of IO threads may perform other types of
+ * commands.
+ *
+ * @param command the IO command that is about to execute
+ * @return 'true' if the command is approved for execution. 'false' if the
+ * command should not be executed
+ */
+ boolean approve(IOCommand command);
+
+ /**
+ * Notification of command completion. Oracle may update its status and commit
+ * the changes a command may cause.
+ *
+ * @param command the IO command that is completed
+ */
+ void commandCompleted(IOCommand command);
+
+ /**
+ * Notification of GC completion. Oracle may take certain decisions based on
+ * GC information (such as amount of time it took, memory it reclaimed, etc.)
+ *
+ * @param gcInfo GC information
+ */
+ void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
+
+ /**
+ * Shut down the out-of-core oracle. Necessary specifically for cases where
+ * out-of-core oracle is using additional monitoring threads.
+ */
+ void shutdown();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
new file mode 100644
index 0000000..477b3ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.google.common.collect.Maps;
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.OutOfCoreIOStatistics;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.command.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+import java.lang.management.MemoryUsage;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory state constantly at a desired state. This oracle
+ * monitors GC behavior to keep track of memory pressure.
+ *
+ * After each GC is done, this oracle retrieve statistics about the memory
+ * pressure (memory used, max memory, and how far away memory is compared to a
+ * max optimal pressure). Based on the the past 2 recent memory statistics,
+ * the oracle predicts the status of the memory, and sets the rate of load/store
+ * of data from/to disk. If the rate of loading data from disk is 'l', and the
+ * rate of storing data to disk is 's', the rate of data injection to memory
+ * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
+ * be based on the prediction of memory status.
+ *
+ * Assume that based on the previous GC call the memory usage at time t_0 is
+ * m_0, and based on the most recent GC call the memory usage at time t_1 is
+ * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
+ * Assume that the ideal memory pressure happens when the memory usage is
+ * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
+ * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
+ * injection rate to memory so far was i, the new injection rate should be:
+ * i_new = i - (alpha - beta)
+ */
+public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
+ /**
+ * The optimal memory pressure at which GC behavior is close to ideal. This
+ * fraction may be dependant on the GC strategy used for running a job, but
+ * generally should not be dependent on the graph processing application.
+ */
+ public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
+ "The memory pressure (fraction of used memory) at which the job " +
+ "shows the optimal GC behavior. This fraction may be dependent " +
+ "on the GC strategy used in running the job.");
+
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleGCMonitoringOracle.class);
+ /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+ private final float optimalMemoryPressure;
+ /** Out-of-core engine */
+ private final OutOfCoreEngine oocEngine;
+ /** Status of memory from the last GC call */
+ private GCObservation lastGCObservation;
+ /** Desired rate of data injection to memory */
+ private final AtomicLong desiredDiskToMemoryDataRate =
+ new AtomicLong(0);
+ /** Number of on the fly (outstanding) IO commands for each command type */
+ private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences =
+ Maps.newConcurrentMap();
+
+ /**
+ * Constructor
+ *
+ * @param conf configuration
+ * @param oocEngine out-of-core engine
+ */
+ public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
+ OutOfCoreEngine oocEngine) {
+ this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+ this.oocEngine = oocEngine;
+ this.lastGCObservation = new GCObservation(-1, 0, 0);
+ for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+ commandOccurrences.put(type, new AtomicInteger(0));
+ }
+ }
+
+ @Override
+ public synchronized void gcCompleted(GarbageCollectionNotificationInfo
+ gcInfo) {
+ long time = System.currentTimeMillis();
+ Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
+ .getMemoryUsageAfterGc();
+ long usedMemory = 0;
+ long maxMemory = 0;
+ for (MemoryUsage memDetail : memAfter.values()) {
+ usedMemory += memDetail.getUsed();
+ maxMemory += memDetail.getMax();
+ }
+ GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: GC completed with: " + observation);
+ }
+ // Whether this is not the first GC call in the application
+ if (lastGCObservation.isValid()) {
+ long deltaDataRate =
+ lastGCObservation.getDesiredDeltaDataRate(observation);
+ long diskBandwidthEstimate =
+ oocEngine.getIOStatistics().getDiskBandwidth();
+ // Update the desired data injection rate to memory. The data injection
+ // rate cannot be less than -disk_bandwidth (the extreme case happens if
+ // we only do 'store'), and cannot be more than disk_bandwidth (the
+ // extreme case happens if we only do 'load').
+ long dataInjectionRate = desiredDiskToMemoryDataRate.get();
+ desiredDiskToMemoryDataRate.set(Math.max(
+ Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
+ diskBandwidthEstimate), -diskBandwidthEstimate));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("gcCompleted: changing data injection rate from " +
+ String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
+ " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
+ 1024.0 / 1024.0));
+ }
+ }
+ lastGCObservation = observation;
+ }
+
+ /**
+ * Get the current data injection rate to memory based on the commands ran
+ * in the history (retrieved from statistics collector), and outstanding
+ * commands issued by the IO scheduler.
+ *
+ * @return the current data injection rate to memory
+ */
+ private long getCurrentDataInjectionRate() {
+ long effectiveBytesTransferred = 0;
+ long effectiveDuration = 0;
+ for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+ OutOfCoreIOStatistics.BytesDuration stats =
+ oocEngine.getIOStatistics().getCommandTypeStats(type);
+ int occurrence = commandOccurrences.get(type).get();
+ long typeBytesTransferred = stats.getBytes();
+ long typeDuration = stats.getDuration();
+ // If there is an outstanding command, we still do not know how many bytes
+ // it will transfer, and how long it will take. So, we guesstimate these
+ // numbers based on other similar commands happened in the history. We
+ // simply take the average number of bytes transferred for the particular
+ // command, and we take average duration for the particular command. We
+ // should multiply these numbers by the number of outstanding commands of
+ // this particular command type.
+ if (stats.getOccurrence() != 0) {
+ typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
+ occurrence;
+ typeDuration += stats.getDuration() / stats.getOccurrence() *
+ occurrence;
+ }
+ if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
+ effectiveBytesTransferred += typeBytesTransferred;
+ } else {
+ // Store (data going out of memory), or wait (no data transferred)
+ effectiveBytesTransferred -= typeBytesTransferred;
+ }
+ effectiveDuration += typeDuration;
+ }
+ if (effectiveDuration == 0) {
+ return 0;
+ } else {
+ return effectiveBytesTransferred / effectiveDuration;
+ }
+ }
+
+ @Override
+ public IOAction[] getNextIOActions() {
+ long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
+ long desiredRate = desiredDiskToMemoryDataRate.get();
+ long currentRate = getCurrentDataInjectionRate();
+ if (desiredRate > error) {
+ // 'l-s' is positive, we should do more load than store.
+ if (currentRate > desiredRate + error) {
+ // We should decrease 'l-s'. This can be done either by increasing 's'
+ // or issuing wait command. We prioritize wait over hard store.
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION};
+ } else if (currentRate < desiredRate - error) {
+ // We should increase 'l-s'. We can simply load partitions/data.
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ } else {
+ // We are in a proper state and we should keep up with the rate. We can
+ // either soft store data or load data (hard load, since we desired rate
+ // is positive).
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION,
+ IOAction.LOAD_PARTITION};
+ }
+ } else if (desiredRate < -error) {
+ // 'l-s' is negative, we should do more store than load.
+ if (currentRate < desiredRate - error) {
+ // We should increase 'l-s', but we should be cautious. We only do soft
+ // load, or wait.
+ return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+ } else if (currentRate > desiredRate + error) {
+ // We should reduce 'l-s', we do hard store.
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PARTITION};
+ } else {
+ // We should keep up with the rate. We can either soft store data, or
+ // soft load data.
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION,
+ IOAction.LOAD_UNPROCESSED_PARTITION};
+ }
+ } else {
+ // 'l-s' is almost zero. If current rate is over the desired rate, we do
+ // soft store. If the current rate is below the desired rate, we do soft
+ // load.
+ if (currentRate > desiredRate + error) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION};
+ } else if (currentRate < desiredRate - error) {
+ return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+ } else {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION,
+ IOAction.LOAD_UNPROCESSED_PARTITION};
+ }
+ }
+ }
+
+ @Override
+ public synchronized boolean approve(IOCommand command) {
+ long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 0.05);
+ long desiredRate = desiredDiskToMemoryDataRate.get();
+ long currentRate = getCurrentDataInjectionRate();
+ // The command is denied iff the current rate is above the desired rate and
+ // we are doing load (instead of store), or the current rate is below the
+ // desired rate and we are doing store (instead of loading).
+ if (currentRate > desiredRate + error &&
+ command instanceof LoadPartitionIOCommand) {
+ return false;
+ }
+ if (currentRate < desiredRate - error &&
+ !(command instanceof LoadPartitionIOCommand) &&
+ !(command instanceof WaitIOCommand)) {
+ return false;
+ }
+ commandOccurrences.get(command.getType()).getAndIncrement();
+ return true;
+ }
+
+ @Override
+ public void commandCompleted(IOCommand command) {
+ commandOccurrences.get(command.getType()).getAndDecrement();
+ }
+
+ @Override
+ public void shutdown() { }
+
+ /** Helper class to record memory status after GC calls */
+ private class GCObservation {
+ /** The time at which the GC happened (in milliseconds) */
+ private long time;
+ /** Amount of memory used after the GC call */
+ private long usedMemory;
+ /** Maximum amounts of memory reported by GC listener */
+ private long maxMemory;
+
+ /**
+ * Constructor
+ *
+ * @param time time of GC
+ * @param usedMemory amount of used memory after GC
+ * @param maxMemory amount of all available memory based on GC observation
+ */
+ public GCObservation(long time, long usedMemory, long maxMemory) {
+ this.time = time;
+ this.usedMemory = usedMemory;
+ this.maxMemory = maxMemory;
+ }
+
+ /**
+ * Is this a valid observation?
+ *
+ * @return true iff it is a valid observation
+ */
+ public boolean isValid() {
+ return time > 0;
+ }
+
+ /**
+ * Considering a new observation of memory status after the most recent GC,
+ * what is the desired rate for data injection to memory.
+ *
+ * @param newObservation the most recent GC observation
+ * @return desired rate of data injection to memory
+ */
+ public long getDesiredDeltaDataRate(GCObservation newObservation) {
+ long newUsedMemory = newObservation.usedMemory;
+ long newMaxMemory = newObservation.maxMemory;
+ long lastUsedMemory = usedMemory;
+ long lastMaxMemory = maxMemory;
+ // Scale the memory status of two GC observation to be the same
+ long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
+ newUsedMemory =
+ (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
+ lastUsedMemory =
+ (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
+ long desiredUsedMemory = (long) (optimalMemoryPressure * scaledMaxMemory);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " +
+ "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
+ "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
+ String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
+ 1024.0));
+ }
+ long interval = newObservation.time - time;
+ if (interval == 0) {
+ interval = 1;
+ LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
+ "time!");
+ }
+ long currentDataRate = (long) ((double) (newUsedMemory -
+ lastUsedMemory) / interval * 1000);
+ long desiredDataRate = (long) ((double) (desiredUsedMemory -
+ newUsedMemory) / interval * 1000);
+ return currentDataRate - desiredDataRate;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
+ "time: %d ms)", usedMemory / 1024.0 / 1024.0,
+ maxMemory / 1024.0 / 1024.0, time);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
new file mode 100644
index 0000000..ff2b3f7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.policy;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.command.IOCommand;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory usage at a desired state. Out-of-core policy in this
+ * oracle is based on several user-defined thresholds. Also, this oracle spawns
+ * a thread to periodically check the memory usage. This thread would issue
+ * manual GC calls if JVM fails to call major/full GC for a while and the amount
+ * of used memory is about to cause high-memory pressure. This oracle, also,
+ * monitors GC activities. The monitoring mechanism looks for major/full GC
+ * calls, and updates out-of-core decisions based on the amount of available
+ * memory after such GCs. There are three out-of-core decisions:
+ * - Which IO operations should be done (load/offload of partitions and
+ * messages)
+ * - What the incoming messages rate should be (updating credits announced by
+ * this worker in credit-based flow-control mechanism)
+ * - How many processing threads should remain active (tethering rate of
+ * data generation)
+ *
+ * The following table shows the relationship of these decisions and
+ * used-defined thresholds.
+ * --------------------------------------------------------------
+ * Memory Pressure | Manual | IO | Credit | Active |
+ * (memory usage) | GC? | Action | | Threads |
+ * --------------------------------------------------------------
+ * | Yes | hard | 0 | 0 |
+ * | | store | | |
+ * failPressure -------------------------------------------------
+ * | Yes | hard | 0 | fraction |
+ * | | store | | |
+ * emergencyPressure --------------------------------------------
+ * | Yes | hard | fraction | max |
+ * | | store | | |
+ * highPressure -------------------------------------------------
+ * | No | soft | fraction | max |
+ * | | store | | |
+ * optimalPressure ----------------------------------------------
+ * | No | soft | max | max |
+ * | | load | | |
+ * lowPressure --------------------------------------------------
+ * | No | hard | max | max |
+ * | | load | | |
+ * --------------------------------------------------------------
+ *
+ */
+public class ThresholdBasedOracle implements OutOfCoreOracle {
+ /** The memory pressure at/above which the job would fail */
+ public static final FloatConfOption FAIL_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.memory.failPressure", 0.975f,
+ "The memory pressure (fraction of used memory) at/above which the " +
+ "job would fail.");
+ /**
+ * The memory pressure at which the job is cloe to fail, even though we were
+ * using maximal disk bandwidth and minimal network rate. We should reduce
+ * job processing rate.
+ */
+ public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
+ "The memory pressure (fraction of used memory) at which the job " +
+ "is close to fail, hence we should reduce its processing rate " +
+ "as much as possible.");
+ /** The memory pressure at which the job is suffering from GC overhead. */
+ public static final FloatConfOption HIGH_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.memory.highPressure", 0.875f,
+ "The memory pressure (fraction of used memory) at which the job " +
+ "is suffering from GC overhead.");
+ /**
+ * The memory pressure at which we expect GC to perform optimally for a
+ * memory intensive job.
+ */
+ public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
+ "The memory pressure (fraction of used memory) at which a " +
+ "memory-intensive job shows the optimal GC behavior.");
+ /**
+ * The memory pressure at/below which the job can use more memory without
+ * suffering from GC overhead.
+ */
+ public static final FloatConfOption LOW_MEMORY_PRESSURE =
+ new FloatConfOption("giraph.memory.lowPressure", 0.7f,
+ "The memory pressure (fraction of used memory) at/below which the " +
+ "job can use more memory without suffering the performance.");
+ /** The interval at which memory observer thread wakes up. */
+ public static final LongConfOption CHECK_MEMORY_INTERVAL =
+ new LongConfOption("giraph.checkMemoryInterval", 2500,
+ "The interval/period where memory observer thread wakes up and " +
+ "monitors memory footprint (in milliseconds)");
+ /**
+ * Memory observer thread would manually call GC if major/full GC has not
+ * been called for a while. The period where we expect GC to be happened in
+ * past is specified in this parameter
+ */
+ public static final LongConfOption LAST_GC_CALL_INTERVAL =
+ new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
+ "How long after last major/full GC should we call manual GC?");
+
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(ThresholdBasedOracle.class);
+ /** Cached value for FAIL_MEMORY_PRESSURE */
+ private final float failMemoryPressure;
+ /** Cached value for EMERGENCY_MEMORY_PRESSURE */
+ private final float emergencyMemoryPressure;
+ /** Cached value for HIGH_MEMORY_PRESSURE */
+ private final float highMemoryPressure;
+ /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+ private final float optimalMemoryPressure;
+ /** Cached value for LOW_MEMORY_PRESSURE */
+ private final float lowMemoryPressure;
+ /** Cached value for CHECK_MEMORY_INTERVAL */
+ private final long checkMemoryInterval;
+ /** Cached value for LAST_GC_CALL_INTERVAL */
+ private final long lastGCCallInterval;
+ /**
+ * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
+ * credit used for credit-based flow-control mechanism)
+ */
+ private final short maxRequestsCredit;
+ /**
+ * Whether the job is shutting down. Used for terminating the memory
+ * observer thread.
+ */
+ private final CountDownLatch shouldTerminate;
+ /** Result of memory observer thread */
+ private final Future<Void> checkMemoryThreadResult;
+ /** Out-of-core engine */
+ private final OutOfCoreEngine oocEngine;
+ /** Last time a major/full GC has been called (in milliseconds) */
+ private volatile long lastMajorGCTime;
+ /** Last time a non major/full GC has been called (in milliseconds) */
+ private volatile long lastMinorGCTime;
+
+ /**
+ * Constructor
+ *
+ * @param conf configuration
+ * @param oocEngine out-of-core engine
+ */
+ public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
+ OutOfCoreEngine oocEngine) {
+ this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
+ this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
+ this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
+ this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+ this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
+ this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
+ this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
+ this.maxRequestsCredit = (short)
+ CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+ NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
+ boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+ checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
+ "must be enabled. Use giraph.waitForPerWorkerRequests=true");
+ this.shouldTerminate = new CountDownLatch(1);
+ this.oocEngine = oocEngine;
+ this.lastMajorGCTime = 0;
+
+ CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ @Override
+ public Callable<Void> newCallable(int callableId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ boolean done = shouldTerminate.await(checkMemoryInterval,
+ TimeUnit.MILLISECONDS);
+ if (done) {
+ break;
+ }
+ double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+ long time = System.currentTimeMillis();
+ if ((usedMemoryFraction > highMemoryPressure &&
+ time - lastMajorGCTime >= lastGCCallInterval) ||
+ (usedMemoryFraction > optimalMemoryPressure &&
+ time - lastMajorGCTime >= lastGCCallInterval &&
+ time - lastMinorGCTime >= lastGCCallInterval)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("call: last GC happened a while ago and the " +
+ "amount of used memory is high (used memory " +
+ "fraction is " +
+ String.format("%.2f", usedMemoryFraction) + "). " +
+ "Calling GC manually");
+ }
+ System.gc();
+ time = System.currentTimeMillis() - time;
+ usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("call: manual GC is done. It took " +
+ String.format("%.2f", (double) time / 1000) +
+ " seconds. Used memory fraction is " +
+ String.format("%.2f", usedMemoryFraction));
+ }
+ }
+ updateRates(usedMemoryFraction);
+ }
+ return null;
+ }
+ };
+ }
+ };
+ ExecutorService executor = Executors.newSingleThreadExecutor(
+ ThreadUtils.createThreadFactory("check-memory"));
+ this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
+ callableFactory.newCallable(0)));
+ executor.shutdown();
+ }
+
+ /**
+ * upon major/full GC calls.
+ */
+ /**
+ * Update statistics and rate regarding communication credits and number of
+ * active threads.
+ *
+ * @param usedMemoryFraction the fraction of used memory over max memory
+ */
+ public void updateRates(double usedMemoryFraction) {
+ // Update the fraction of processing threads that should remain active
+ if (usedMemoryFraction >= failMemoryPressure) {
+ oocEngine.updateActiveThreadsFraction(0);
+ } else if (usedMemoryFraction < emergencyMemoryPressure) {
+ oocEngine.updateActiveThreadsFraction(1);
+ } else {
+ oocEngine.updateActiveThreadsFraction(1 -
+ (usedMemoryFraction - emergencyMemoryPressure) /
+ (failMemoryPressure - emergencyMemoryPressure));
+ }
+
+ // Update the fraction of credit that should be used in credit-based flow-
+ // control
+ if (usedMemoryFraction >= emergencyMemoryPressure) {
+ updateRequestsCredit((short) 0);
+ } else if (usedMemoryFraction < optimalMemoryPressure) {
+ updateRequestsCredit(maxRequestsCredit);
+ } else {
+ updateRequestsCredit((short) (maxRequestsCredit *
+ (1 - (usedMemoryFraction - optimalMemoryPressure) /
+ (emergencyMemoryPressure - optimalMemoryPressure))));
+ }
+ }
+
+ @Override
+ public IOAction[] getNextIOActions() {
+ double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
+ usedMemoryFraction));
+ }
+ if (usedMemoryFraction > highMemoryPressure) {
+ return new IOAction[]{
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PARTITION};
+ } else if (usedMemoryFraction > optimalMemoryPressure) {
+ return new IOAction[]{
+ IOAction.LOAD_UNPROCESSED_PARTITION,
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.STORE_PROCESSED_PARTITION};
+ } else if (usedMemoryFraction > lowMemoryPressure) {
+ return new IOAction[]{
+ IOAction.LOAD_UNPROCESSED_PARTITION,
+ IOAction.STORE_MESSAGES_AND_BUFFERS,
+ IOAction.LOAD_PARTITION};
+ } else {
+ return new IOAction[]{IOAction.LOAD_PARTITION};
+ }
+ }
+
+ @Override
+ public boolean approve(IOCommand command) {
+ return true;
+ }
+
+ @Override
+ public void commandCompleted(IOCommand command) {
+ // Do nothing
+ }
+
+ @Override
+ public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
+ String gcAction = gcInfo.getGcAction().toLowerCase();
+ if (gcAction.contains("full") || gcAction.contains("major")) {
+ if (!gcInfo.getGcCause().contains("No GC")) {
+ lastMajorGCTime = System.currentTimeMillis();
+ }
+ } else {
+ lastMinorGCTime = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ shouldTerminate.countDown();
+ try {
+ checkMemoryThreadResult.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("shutdown: caught exception while waiting on check-memory " +
+ "thread to terminate!");
+ throw new IllegalStateException(e);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
+ }
+ }
+
+ /**
+ * Update the credit announced for this worker in Netty. The lower the credit
+ * is, the lower rate incoming messages arrive at this worker. Thus, credit
+ * is an indirect way of controlling amount of memory incoming messages would
+ * take.
+ *
+ * @param newCredit the new credit to announce to other workers
+ */
+ private void updateRequestsCredit(short newCredit) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
+ }
+ FlowControl flowControl = oocEngine.getFlowControl();
+ if (flowControl != null) {
+ ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
new file mode 100644
index 0000000..c58289f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to out-of-core policy
+ */
+package org.apache.giraph.ooc.policy;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index d3ace99..c54e7b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -123,7 +123,7 @@ public class ZooKeeperManager {
this.context = context;
this.conf = configuration;
taskPartition = conf.getTaskPartition();
- jobId = conf.get("mapred.job.id", "Unknown Job");
+ jobId = conf.getJobId();
baseDirectory =
new Path(ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(conf,
getFinalZooKeeperPath()));