You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/04/12 00:58:50 UTC
[1/2] git commit: updated refs/heads/trunk to 39f3591
Updated Branches:
refs/heads/trunk ff8d98e20 -> 39f359136
GIRAPH-616: Decouple vertices and edges in DiskBackedPartitionStore and avoid writing back edges when the algorithm does not change topology.
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/228edbbd
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/228edbbd
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/228edbbd
Branch: refs/heads/trunk
Commit: 228edbbd798f7718a5a7ccbcfd35c22e812be761
Parents: ff8d98e
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Apr 11 23:47:00 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Apr 11 23:47:00 2013 +0200
----------------------------------------------------------------------
.../apache/giraph/conf/GiraphConfiguration.java | 9 +
.../org/apache/giraph/conf/GiraphConstants.java | 7 +
.../giraph/partition/DiskBackedPartitionStore.java | 167 +++++++++++++--
3 files changed, 166 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 01f22da..0aeec40 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -885,4 +885,13 @@ public class GiraphConfiguration extends Configuration
public int getMaxNumberOfSupersteps() {
return MAX_NUMBER_OF_SUPERSTEPS.get(this);
}
+
+ /**
+ * Whether the application with change or not the graph topology.
+ *
+ * @return true if the graph is static, false otherwise.
+ */
+ public boolean isStaticGraph() {
+ return STATIC_GRAPH.isTrue(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 21e094d..a55a2a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -650,5 +650,12 @@ public interface GiraphConstants {
*/
IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
new IntConfOption("giraph.maxNumberOfSupersteps", 1);
+
+ /**
+ * The application will not mutate the graph topology (the edges). It is used
+ * to optimise out-of-core graph, by not writing back edges every time.
+ */
+ BooleanConfOption STATIC_GRAPH =
+ new BooleanConfOption("giraph.isStaticGraph", false);
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 11e0a90..53b9dd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -19,8 +19,8 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -35,7 +35,9 @@ import com.google.common.hash.Hashing;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -291,7 +293,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
} finally {
for (Integer id : onDisk.values()) {
- deletePartitionFile(id);
+ deletePartitionFiles(id);
}
}
}
@@ -356,7 +358,78 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Load a partition from disk. It deletes the file after the load.
+ * Writes vertex data (Id, Vertex Value and halted state) to stream.
+ *
+ * @param output The output stream
+ * @param vertex The vertex to serialize
+ * @throws IOException
+ */
+ private void writeVertexData(
+ DataOutput output,
+ Vertex<I, V, E, M> vertex)
+ throws IOException {
+ vertex.getId().write(output);
+ vertex.getValue().write(output);
+ output.writeBoolean(vertex.isHalted());
+ }
+
+ /**
+ * Writes vertex edges (Id, Edges) to stream.
+ *
+ * @param output The output stream
+ * @param vertex The vertex to serialize
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private void writeVertexEdges(
+ DataOutput output,
+ Vertex<I, V, E, M> vertex)
+ throws IOException {
+ vertex.getId().write(output);
+ ((VertexEdges<I, E>) vertex.getEdges()).write(output);
+ }
+
+ /**
+ * Read vertex data from an input and initialize the vertex.
+ *
+ * @param in The input stream
+ * @param vertex The vertex to initialize
+ * @throws IOException
+ */
+ private void readVertexData(DataInput in, Vertex<I, V, E, M> vertex)
+ throws IOException {
+ I id = conf.createVertexId();
+ id.readFields(in);
+ V value = conf.createVertexValue();
+ value.readFields(in);
+ vertex.initialize(id, value);
+ if (in.readBoolean()) {
+ vertex.voteToHalt();
+ } else {
+ vertex.wakeUp();
+ }
+ }
+
+ /**
+ * Read vertex edges from an input and set them to the vertex.
+ *
+ * @param in The input stream
+ * @param partition The partition owning the vertex
+ * @throws IOException
+ */
+ private void readVertexEdges(DataInput in, Partition<I, V, E, M> partition)
+ throws IOException {
+ I id = conf.createVertexId();
+ id.readFields(in);
+ Vertex<I, V, E, M> v = partition.getVertex(id);
+ VertexEdges<I, E> edges = conf.createVertexEdges();
+ edges.readFields(in);
+ v.setEdges(edges);
+ }
+
+ /**
+ * Load a partition from disk. It deletes the files after the load,
+ * except for the edges, if the graph is static.
*
* @param id The id of the partition to load
* @param numVertices The number of vertices contained on disk
@@ -367,16 +440,29 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
throws IOException {
Partition<I, V, E, M> partition =
conf.createPartition(id, context);
- File file = new File(getPartitionPath(id));
+ File file = new File(getVerticesPath(id));
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V , E, M> vertex =
- WritableUtils.readVertexFromDataInput(inputStream, conf);
+ Vertex<I, V , E, M> vertex = conf.createVertex();
+ readVertexData(inputStream, vertex);
partition.putVertex(vertex);
}
inputStream.close();
file.delete();
+ file = new File(getEdgesPath(id));
+ inputStream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file)));
+ for (int i = 0; i < numVertices; ++i) {
+ readVertexEdges(inputStream, partition);
+ }
+ inputStream.close();
+ /*
+ * If the graph is static, keep the file around.
+ */
+ if (!conf.isStaticGraph()) {
+ file.delete();
+ }
return partition;
}
@@ -388,19 +474,37 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
*/
private void offloadPartition(Partition<I, V, E, M> partition)
throws IOException {
- File file = new File(getPartitionPath(partition.getId()));
+ File file = new File(getVerticesPath(partition.getId()));
file.getParentFile().mkdirs();
file.createNewFile();
if (LOG.isInfoEnabled()) {
- LOG.info("offloadPartition: writing partition " + partition.getId() +
- " to " + file.getAbsolutePath());
+ LOG.info("offloadPartition: writing partition vertices " +
+ partition.getId() + " to " + file.getAbsolutePath());
}
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
- WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
+ writeVertexData(outputStream, vertex);
}
outputStream.close();
+ file = new File(getEdgesPath(partition.getId()));
+ /*
+ * Avoid writing back edges if we have already written them once and
+ * the graph is not changing.
+ */
+ if (!conf.isStaticGraph() || !file.exists()) {
+ file.createNewFile();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("offloadPartition: writing partition edges " +
+ partition.getId() + " to " + file.getAbsolutePath());
+ }
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeVertexEdges(outputStream, vertex);
+ }
+ outputStream.close();
+ }
}
/**
@@ -415,27 +519,36 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
Integer id = partition.getId();
Integer count = onDisk.get(id);
onDisk.put(id, count + (int) partition.getVertexCount());
- File file = new File(getPartitionPath(id));
+ File file = new File(getVerticesPath(id));
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file, true)));
for (Vertex<I, V, E, M> vertex : partition) {
- WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
+ writeVertexData(outputStream, vertex);
+ }
+ outputStream.close();
+ file = new File(getEdgesPath(id));
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file, true)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeVertexEdges(outputStream, vertex);
}
outputStream.close();
}
/**
- * Delete a partition file
+ * Delete a partition's files.
*
* @param id The id of the partition owning the file.
*/
- public void deletePartitionFile(Integer id) {
- File file = new File(getPartitionPath(id));
+ public void deletePartitionFiles(Integer id) {
+ File file = new File(getVerticesPath(id));
+ file.delete();
+ file = new File(getEdgesPath(id));
file.delete();
}
/**
- * Get the path to the file where a partition is stored.
+ * Get the path and basename of the storage files.
*
* @param partitionId The partition
* @return The path to the given partition
@@ -447,6 +560,26 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
+ * Get the path to the file where vertices are stored.
+ *
+ * @param partitionId The partition
+ * @return The path to the vertices file
+ */
+ private String getVerticesPath(Integer partitionId) {
+ return getPartitionPath(partitionId) + "_vertices";
+ }
+
+ /**
+ * Get the path to the file where edges are stored.
+ *
+ * @param partitionId The partition
+ * @return The path to the edges file
+ */
+ private String getEdgesPath(Integer partitionId) {
+ return getPartitionPath(partitionId) + "_edges";
+ }
+
+ /**
* Task that gets a partition from the store
*/
private class GetPartition implements Callable<Partition<I, V, E, M>> {
@@ -707,7 +840,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
switch (pState) {
case ONDISK:
onDisk.remove(id);
- deletePartitionFile(id);
+ deletePartitionFiles(id);
done = true;
break;
case INACTIVE:
[2/2] git commit: updated refs/heads/trunk to 39f3591
Posted by cl...@apache.org.
GIRAPH-616
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/39f35913
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/39f35913
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/39f35913
Branch: refs/heads/trunk
Commit: 39f3591365cbe15b74c82574787d0592681d8ba5
Parents: 228edbb
Author: Claudio Martella <cl...@apache.org>
Authored: Fri Apr 12 00:58:13 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Fri Apr 12 00:58:13 2013 +0200
----------------------------------------------------------------------
CHANGELOG | 3 +++
1 files changed, 3 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/39f35913/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4a1e7ca..2cb8eb3 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-616: Decouple vertices and edges in DiskBackedPartitionStore and avoid writing
+ back edges when the algorithm does not change topology. (claudio)
+
GIRAPH-613: Remove Writable from the interfaces implemented by Vertex (claudio)
GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo)