You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/17 22:31:04 UTC
[1/6] git commit: GIRAPH-514: DiskBackedMessageStores should take
advantage of machines with multiple disks
GIRAPH-514: DiskBackedMessageStores should take advantage of machines with multiple disks
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a47ca0b4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a47ca0b4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a47ca0b4
Branch: refs/heads/trunk
Commit: a47ca0b491489b4d28a435a5cdc3b218de0e0933
Parents: 329af80
Author: Claudio Martella <cl...@gmail.com>
Authored: Thu Feb 14 16:23:08 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Thu Feb 14 16:23:08 2013 +0100
----------------------------------------------------------------------
.../comm/messages/SequentialFileMessageStore.java | 28 ++++++++++----
.../org/apache/giraph/conf/GiraphConstants.java | 2 +-
2 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
index 1805f0b..3698527 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
@@ -40,6 +40,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
@@ -361,8 +362,8 @@ public class SequentialFileMessageStore<I extends WritableComparable,
implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> {
/** Hadoop configuration */
private final ImmutableClassesGiraphConfiguration config;
- /** Directory in which we'll keep necessary files */
- private final String directory;
+ /** Directories in which we'll keep necessary files */
+ private final String[] directories;
/** Buffer size to use when reading and writing */
private final int bufferSize;
/** Counter for created message stores */
@@ -376,18 +377,29 @@ public class SequentialFileMessageStore<I extends WritableComparable,
public Factory(ImmutableClassesGiraphConfiguration config) {
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
- this.directory = config.get(GiraphConstants.MESSAGES_DIRECTORY,
- GiraphConstants.MESSAGES_DIRECTORY_DEFAULT) + jobId +
- File.separator;
- this.bufferSize = config.getInt(GiraphConstants.MESSAGES_BUFFER_SIZE,
+
+ List<String> userPaths = Lists.newArrayList(config.getStrings(
+ GiraphConstants.MESSAGES_DIRECTORY,
+ GiraphConstants.MESSAGES_DIRECTORY_DEFAULT));
+ Collections.shuffle(userPaths);
+ directories = new String[userPaths.size()];
+ int i = 0;
+ for (String path : userPaths) {
+ String directory = path + jobId;
+ directories[i++] = directory;
+ new File(directory).mkdirs();
+ }
+ this.bufferSize = config.getInt(
+ GiraphConstants.MESSAGES_BUFFER_SIZE,
GiraphConstants.MESSAGES_BUFFER_SIZE_DEFAULT);
storeCounter = new AtomicInteger();
- new File(directory).mkdirs();
}
@Override
public BasicMessageStore<I, M> newStore() {
- String fileName = directory + storeCounter.getAndIncrement();
+ int idx = storeCounter.getAndIncrement();
+ String fileName =
+ directories[idx % directories.length] + "/messages-" + idx;
return new SequentialFileMessageStore<I, M>(config, bufferSize,
fileName);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 415009c..44d09c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -528,7 +528,7 @@ public interface GiraphConstants {
/**
* Comma-separated list of directories in the local filesystem for
- * out-of-core partitions.
+ * out-of-core partitions.
*/
String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
/** Default directory for out-of-core partitions. */