You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/14 15:38:22 UTC
git commit: GIRAPH 513
Updated Branches:
refs/heads/trunk c17a6483c -> 7fc9390d3
GIRAPH 513
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7fc9390d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7fc9390d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7fc9390d
Branch: refs/heads/trunk
Commit: 7fc9390d383799aee37917ac1050db19c89b8b98
Parents: c17a648
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Feb 14 15:33:07 2013 +0100
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Feb 14 15:33:07 2013 +0100
----------------------------------------------------------------------
.../org/apache/giraph/conf/GiraphConstants.java | 5 ++-
.../giraph/partition/DiskBackedPartitionStore.java | 31 ++++++++++----
2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8797c0e..415009c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -526,7 +526,10 @@ public interface GiraphConstants {
/** Default size of buffer when reading and writing messages out-of-core. */
int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
- /** Directory in the local filesystem for out-of-core partitions. */
+ /**
+ * Comma-separated list of directories in the local filesystem for
+ * out-of-core partitions.
+ */
String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
/** Default directory for out-of-core partitions. */
String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 844a229..725de39 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -53,6 +53,8 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
/**
* Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
@@ -107,7 +109,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
/** Mapper context */
private final Context context;
/** Base path where the partition files are written to */
- private final String basePath;
+ private final String[] basePaths;
+ /** Used to hash partition Ids */
+ private final HashFunction hasher = Hashing.murmur3_32();
/** Maximum number of slots */
private final int maxInMemoryPartitions;
/** Number of slots used */
@@ -128,9 +132,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
maxInMemoryPartitions = Math.max(1,
conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
- basePath = conf.get("mapred.job.id", "Unknown Job") +
- conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
- GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+
+ // Take advantage of multiple disks
+ String[] userPaths = conf.getStrings(
+ GiraphConstants.PARTITIONS_DIRECTORY,
+ GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+ basePaths = new String[userPaths.length];
+ int i = 0;
+ for (String path : userPaths) {
+ basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
+ }
}
@Override
@@ -378,13 +389,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
*/
private void offloadPartition(Partition<I, V, E, M> partition)
throws IOException {
- if (LOG.isInfoEnabled()) {
- LOG.info("offloadPartition: writing partition " + partition.getId() +
- " to disk.");
- }
File file = new File(getPartitionPath(partition.getId()));
file.getParentFile().mkdirs();
file.createNewFile();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("offloadPartition: writing partition " + partition.getId() +
+ " to " + file.getAbsolutePath());
+ }
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
@@ -431,7 +442,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
* @return The path to the given partition
*/
private String getPartitionPath(Integer partitionId) {
- return basePath + "/partition-" + partitionId;
+ int hash = hasher.hashInt(partitionId).asInt();
+ int idx = Math.abs(hash % basePaths.length);
+ return basePaths[idx] + "/partition-" + partitionId;
}
/**