You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/04/12 00:58:50 UTC

[1/2] git commit: updated refs/heads/trunk to 39f3591

Updated Branches:
  refs/heads/trunk ff8d98e20 -> 39f359136


GIRAPH-616: Decouple vertices and edges in DiskBackedPartitionStore and avoid writing back edges when the algorithm does not change topology.


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/228edbbd
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/228edbbd
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/228edbbd

Branch: refs/heads/trunk
Commit: 228edbbd798f7718a5a7ccbcfd35c22e812be761
Parents: ff8d98e
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Apr 11 23:47:00 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Apr 11 23:47:00 2013 +0200

----------------------------------------------------------------------
 .../apache/giraph/conf/GiraphConfiguration.java    |    9 +
 .../org/apache/giraph/conf/GiraphConstants.java    |    7 +
 .../giraph/partition/DiskBackedPartitionStore.java |  167 +++++++++++++--
 3 files changed, 166 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 01f22da..0aeec40 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -885,4 +885,13 @@ public class GiraphConfiguration extends Configuration
   public int getMaxNumberOfSupersteps() {
     return MAX_NUMBER_OF_SUPERSTEPS.get(this);
   }
+
+  /**
+   * Whether the application with change or not the graph topology.
+   *
+   * @return true if the graph is static, false otherwise.
+   */
+  public boolean isStaticGraph() {
+    return STATIC_GRAPH.isTrue(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 21e094d..a55a2a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -650,5 +650,12 @@ public interface GiraphConstants {
    */
   IntConfOption MAX_NUMBER_OF_SUPERSTEPS =
       new IntConfOption("giraph.maxNumberOfSupersteps", 1);
+
+  /**
+   * The application will not mutate the graph topology (the edges). It is used
+   * to optimise out-of-core graph, by not writing back edges every time.
+   */
+  BooleanConfOption STATIC_GRAPH =
+      new BooleanConfOption("giraph.isStaticGraph", false);
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/228edbbd/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 11e0a90..53b9dd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -19,8 +19,8 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.VertexEdges;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -35,7 +35,9 @@ import com.google.common.hash.Hashing;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -291,7 +293,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
       }
     } finally {
       for (Integer id : onDisk.values()) {
-        deletePartitionFile(id);
+        deletePartitionFiles(id);
       }
     }
   }
@@ -356,7 +358,78 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Load a partition from disk. It deletes the file after the load.
+   * Writes vertex data (Id, Vertex Value and halted state) to stream.
+   *
+   * @param output The output stream
+   * @param vertex The vertex to serialize
+   * @throws IOException
+   */
+  private void writeVertexData(
+      DataOutput output,
+      Vertex<I, V, E, M> vertex)
+    throws IOException {
+    vertex.getId().write(output);
+    vertex.getValue().write(output);
+    output.writeBoolean(vertex.isHalted());
+  }
+
+  /**
+   * Writes vertex edges (Id, Edges) to stream.
+   *
+   * @param output The output stream
+   * @param vertex The vertex to serialize
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  private void writeVertexEdges(
+      DataOutput output,
+      Vertex<I, V, E, M> vertex)
+    throws IOException {
+    vertex.getId().write(output);
+    ((VertexEdges<I, E>) vertex.getEdges()).write(output);
+  }
+
+  /**
+   * Read vertex data from an input and initialize the vertex.
+   *
+   * @param in The input stream
+   * @param vertex The vertex to initialize
+   * @throws IOException
+   */
+  private void readVertexData(DataInput in, Vertex<I, V, E, M> vertex)
+    throws IOException {
+    I id = conf.createVertexId();
+    id.readFields(in);
+    V value = conf.createVertexValue();
+    value.readFields(in);
+    vertex.initialize(id, value);
+    if (in.readBoolean()) {
+      vertex.voteToHalt();
+    } else {
+      vertex.wakeUp();
+    }
+  }
+
+  /**
+   * Read vertex edges from an input and set them to the vertex.
+   *
+   * @param in The input stream
+   * @param partition The partition owning the vertex
+   * @throws IOException
+   */
+  private void readVertexEdges(DataInput in, Partition<I, V, E, M> partition)
+    throws IOException {
+    I id = conf.createVertexId();
+    id.readFields(in);
+    Vertex<I, V, E, M> v = partition.getVertex(id);
+    VertexEdges<I, E> edges = conf.createVertexEdges();
+    edges.readFields(in);
+    v.setEdges(edges);
+  }
+
+  /**
+   * Load a partition from disk. It deletes the files after the load,
+   * except for the edges, if the graph is static.
    *
    * @param id The id of the partition to load
    * @param numVertices The number of vertices contained on disk
@@ -367,16 +440,29 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     throws IOException {
     Partition<I, V, E, M> partition =
         conf.createPartition(id, context);
-    File file = new File(getPartitionPath(id));
+    File file = new File(getVerticesPath(id));
     DataInputStream inputStream = new DataInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V , E, M> vertex =
-          WritableUtils.readVertexFromDataInput(inputStream, conf);
+      Vertex<I, V , E, M> vertex = conf.createVertex();
+      readVertexData(inputStream, vertex);
       partition.putVertex(vertex);
     }
     inputStream.close();
     file.delete();
+    file = new File(getEdgesPath(id));
+    inputStream = new DataInputStream(
+        new BufferedInputStream(new FileInputStream(file)));
+    for (int i = 0; i < numVertices; ++i) {
+      readVertexEdges(inputStream, partition);
+    }
+    inputStream.close();
+    /*
+     * If the graph is static, keep the file around.
+     */
+    if (!conf.isStaticGraph()) {
+      file.delete();
+    }
     return partition;
   }
 
@@ -388,19 +474,37 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    */
   private void offloadPartition(Partition<I, V, E, M> partition)
     throws IOException {
-    File file = new File(getPartitionPath(partition.getId()));
+    File file = new File(getVerticesPath(partition.getId()));
     file.getParentFile().mkdirs();
     file.createNewFile();
     if (LOG.isInfoEnabled()) {
-      LOG.info("offloadPartition: writing partition " + partition.getId() +
-          " to " + file.getAbsolutePath());
+      LOG.info("offloadPartition: writing partition vertices " +
+          partition.getId() + " to " + file.getAbsolutePath());
     }
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file)));
     for (Vertex<I, V, E, M> vertex : partition) {
-      WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
+      writeVertexData(outputStream, vertex);
     }
     outputStream.close();
+    file = new File(getEdgesPath(partition.getId()));
+    /*
+     * Avoid writing back edges if we have already written them once and
+     * the graph is not changing.
+     */
+    if (!conf.isStaticGraph() || !file.exists()) {
+      file.createNewFile();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("offloadPartition: writing partition edges " +
+            partition.getId() + " to " + file.getAbsolutePath());
+      }
+      outputStream = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file)));
+      for (Vertex<I, V, E, M> vertex : partition) {
+        writeVertexEdges(outputStream, vertex);
+      }
+      outputStream.close();
+    }
   }
 
   /**
@@ -415,27 +519,36 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     Integer id = partition.getId();
     Integer count = onDisk.get(id);
     onDisk.put(id, count + (int) partition.getVertexCount());
-    File file = new File(getPartitionPath(id));
+    File file = new File(getVerticesPath(id));
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file, true)));
     for (Vertex<I, V, E, M> vertex : partition) {
-      WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
+      writeVertexData(outputStream, vertex);
+    }
+    outputStream.close();
+    file = new File(getEdgesPath(id));
+    outputStream = new DataOutputStream(
+        new BufferedOutputStream(new FileOutputStream(file, true)));
+    for (Vertex<I, V, E, M> vertex : partition) {
+      writeVertexEdges(outputStream, vertex);
     }
     outputStream.close();
   }
 
   /**
-   * Delete a partition file
+   * Delete a partition's files.
    *
    * @param id The id of the partition owning the file.
    */
-  public void deletePartitionFile(Integer id) {
-    File file = new File(getPartitionPath(id));
+  public void deletePartitionFiles(Integer id) {
+    File file = new File(getVerticesPath(id));
+    file.delete();
+    file = new File(getEdgesPath(id));
     file.delete();
   }
 
   /**
-   * Get the path to the file where a partition is stored.
+   * Get the path and basename of the storage files.
    *
    * @param partitionId The partition
    * @return The path to the given partition
@@ -447,6 +560,26 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
+   * Get the path to the file where vertices are stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the vertices file
+   */
+  private String getVerticesPath(Integer partitionId) {
+    return getPartitionPath(partitionId) + "_vertices";
+  }
+
+  /**
+   * Get the path to the file where edges are stored.
+   *
+   * @param partitionId The partition
+   * @return The path to the edges file
+   */
+  private String getEdgesPath(Integer partitionId) {
+    return getPartitionPath(partitionId) + "_edges";
+  }
+
+  /**
    * Task that gets a partition from the store
    */
   private class GetPartition implements Callable<Partition<I, V, E, M>> {
@@ -707,7 +840,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
           switch (pState) {
           case ONDISK:
             onDisk.remove(id);
-            deletePartitionFile(id);
+            deletePartitionFiles(id);
             done = true;
             break;
           case INACTIVE:


[2/2] git commit: updated refs/heads/trunk to 39f3591

Posted by cl...@apache.org.
GIRAPH-616


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/39f35913
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/39f35913
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/39f35913

Branch: refs/heads/trunk
Commit: 39f3591365cbe15b74c82574787d0592681d8ba5
Parents: 228edbb
Author: Claudio Martella <cl...@apache.org>
Authored: Fri Apr 12 00:58:13 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Fri Apr 12 00:58:13 2013 +0200

----------------------------------------------------------------------
 CHANGELOG |    3 +++
 1 files changed, 3 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/39f35913/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4a1e7ca..2cb8eb3 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-616: Decouple vertices and edges in DiskBackedPartitionStore and avoid writing 
+  back edges when the algorithm does not change topology. (claudio)
+
   GIRAPH-613: Remove Writable from the interfaces implemented by Vertex (claudio)
 
   GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo)