You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/17 22:31:04 UTC

[1/6] git commit: GIRAPH-514: DiskBackedMessageStores should take advantage of machines with multiple disks

GIRAPH-514: DiskBackedMessageStores should take advantage of machines with multiple disks


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a47ca0b4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a47ca0b4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a47ca0b4

Branch: refs/heads/trunk
Commit: a47ca0b491489b4d28a435a5cdc3b218de0e0933
Parents: 329af80
Author: Claudio Martella <cl...@gmail.com>
Authored: Thu Feb 14 16:23:08 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Thu Feb 14 16:23:08 2013 +0100

----------------------------------------------------------------------
 .../comm/messages/SequentialFileMessageStore.java  |   28 ++++++++++----
 .../org/apache/giraph/conf/GiraphConstants.java    |    2 +-
 2 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index 1805f0b..3698527 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -40,6 +40,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -361,8 +362,8 @@ public class SequentialFileMessageStore<I extends WritableComparable,
       implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
     /** Hadoop configuration */
     private final ImmutableClassesGiraphConfiguration config;
-    /** Directory in which we'll keep necessary files */
-    private final String directory;
+    /** Directories in which we'll keep necessary files */
+    private final String[] directories;
     /** Buffer size to use when reading and writing */
     private final int bufferSize;
     /** Counter for created message stores */
@@ -376,18 +377,29 @@ public class SequentialFileMessageStore<I extends WritableComparable,
     public Factory(ImmutableClassesGiraphConfiguration config) {
       this.config = config;
       String jobId = config.get("mapred.job.id", "Unknown Job");
-      this.directory = config.get(GiraphConstants.MESSAGES_DIRECTORY,
-          GiraphConstants.MESSAGES_DIRECTORY_DEFAULT) + jobId +
-          File.separator;
-      this.bufferSize = config.getInt(GiraphConstants.MESSAGES_BUFFER_SIZE,
+
+      List<String> userPaths = Lists.newArrayList(config.getStrings(
+          GiraphConstants.MESSAGES_DIRECTORY,
+          GiraphConstants.MESSAGES_DIRECTORY_DEFAULT));
+      Collections.shuffle(userPaths);
+      directories = new String[userPaths.size()];
+      int i = 0;
+      for (String path : userPaths) {
+        String directory = path + jobId;
+        directories[i++] = directory;
+        new File(directory).mkdirs();
+      }
+      this.bufferSize = config.getInt(
+          GiraphConstants.MESSAGES_BUFFER_SIZE,
           GiraphConstants.MESSAGES_BUFFER_SIZE_DEFAULT);
       storeCounter = new AtomicInteger();
-      new File(directory).mkdirs();
     }
 
     @Override
     public BasicMessageStore<I, M> newStore() {
-      String fileName = directory + storeCounter.getAndIncrement();
+      int idx = storeCounter.getAndIncrement();
+      String fileName =
+          directories[idx % directories.length] + "/messages-" + idx;
       return new SequentialFileMessageStore<I, M>(config, bufferSize,
           fileName);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 415009c..44d09c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -528,7 +528,7 @@ public interface GiraphConstants {
 
   /**
    * Comma-separated list of directories in the local filesystem for
-   * out-of-core partitions. 
+   * out-of-core partitions.
    */
   String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
   /** Default directory for out-of-core partitions. */