You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/14 15:38:22 UTC

git commit: GIRAPH 513

Updated Branches:
  refs/heads/trunk c17a6483c -> 7fc9390d3


GIRAPH 513


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7fc9390d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7fc9390d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7fc9390d

Branch: refs/heads/trunk
Commit: 7fc9390d383799aee37917ac1050db19c89b8b98
Parents: c17a648
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Feb 14 15:33:07 2013 +0100
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Feb 14 15:33:07 2013 +0100

----------------------------------------------------------------------
 .../org/apache/giraph/conf/GiraphConstants.java    |    5 ++-
 .../giraph/partition/DiskBackedPartitionStore.java |   31 ++++++++++----
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8797c0e..415009c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -526,7 +526,10 @@ public interface GiraphConstants {
   /** Default size of buffer when reading and writing messages out-of-core. */
   int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
 
-  /** Directory in the local filesystem for out-of-core partitions. */
+  /**
+   * Comma-separated list of directories in the local filesystem for
+   * out-of-core partitions. 
+   */
   String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
   /** Default directory for out-of-core partitions. */
   String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";

http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 844a229..725de39 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -53,6 +53,8 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 
 /**
  * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
@@ -107,7 +109,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   /** Mapper context */
   private final Context context;
   /** Base path where the partition files are written to */
-  private final String basePath;
+  private final String[] basePaths;
+  /** Used to hash partition Ids */
+  private final HashFunction hasher = Hashing.murmur3_32();
   /** Maximum number of slots */
   private final int maxInMemoryPartitions;
   /** Number of slots used */
@@ -128,9 +132,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     maxInMemoryPartitions = Math.max(1,
         conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
             GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
-    basePath = conf.get("mapred.job.id", "Unknown Job") +
-        conf.get(GiraphConstants.PARTITIONS_DIRECTORY,
-            GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+
+    // Take advantage of multiple disks
+    String[] userPaths = conf.getStrings(
+        GiraphConstants.PARTITIONS_DIRECTORY,
+        GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
+    basePaths = new String[userPaths.length];
+    int i = 0;
+    for (String path : userPaths) {
+      basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
+    }
   }
 
   @Override
@@ -378,13 +389,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    */
   private void offloadPartition(Partition<I, V, E, M> partition)
     throws IOException {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("offloadPartition: writing partition " + partition.getId() +
-          " to disk.");
-    }
     File file = new File(getPartitionPath(partition.getId()));
     file.getParentFile().mkdirs();
     file.createNewFile();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("offloadPartition: writing partition " + partition.getId() +
+          " to " + file.getAbsolutePath());
+    }
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file)));
     for (Vertex<I, V, E, M> vertex : partition) {
@@ -431,7 +442,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @return The path to the given partition
    */
   private String getPartitionPath(Integer partitionId) {
-    return basePath + "/partition-" + partitionId;
+    int hash = hasher.hashInt(partitionId).asInt();
+    int idx  = Math.abs(hash % basePaths.length);
+    return basePaths[idx] + "/partition-" + partitionId;
   }
 
   /**