You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/04/19 18:34:48 UTC

git commit: updated refs/heads/trunk to c1b8840

Updated Branches:
  refs/heads/trunk 75b311266 -> c1b88405c


GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with
DiskBackedPartitionStore (claudio)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1b88405
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1b88405
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1b88405

Branch: refs/heads/trunk
Commit: c1b88405c96db8203641c2baae8ff30527bb2b46
Parents: 75b3112
Author: Claudio Martella <cl...@apache.org>
Authored: Fri Apr 19 18:33:54 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Fri Apr 19 18:33:54 2013 +0200

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../giraph/partition/DiskBackedPartitionStore.java |  143 ++++++++++-----
 .../giraph/partition/TestPartitionStores.java      |   37 +++-
 3 files changed, 128 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d7679a2..5a980cf 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.0.1 - unreleased
+  GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with 
+  DiskBackedPartitionStore (claudio)
+
   GIRAPH-636: Initialize compute OutEdges directly from input OutEdges
   (majakabiljo)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 2d30bf9..a4739f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -143,6 +143,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     for (String path : userPaths) {
       basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
     }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("DiskBackedPartitionStore with maxInMemoryPartitions=" +
+          maxInMemoryPartitions + ", isStaticGraph=" + conf.isStaticGraph());
+    }
   }
 
   @Override
@@ -358,15 +362,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Writes vertex data (Id, Vertex Value and halted state) to stream.
+   * Writes vertex data (Id, value and halted state) to stream.
    *
    * @param output The output stream
    * @param vertex The vertex to serialize
    * @throws IOException
    */
-  private void writeVertexData(
-      DataOutput output,
-      Vertex<I, V, E, M> vertex)
+  private void writeVertexData(DataOutput output, Vertex<I, V, E, M> vertex)
     throws IOException {
     vertex.getId().write(output);
     vertex.getValue().write(output);
@@ -374,16 +376,14 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
   }
 
   /**
-   * Writes vertex edges (Id, Edges) to stream.
+   * Writes vertex edges (Id, edges) to stream.
    *
    * @param output The output stream
    * @param vertex The vertex to serialize
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  private void writeOutEdges(
-      DataOutput output,
-      Vertex<I, V, E, M> vertex)
+  private void writeOutEdges(DataOutput output, Vertex<I, V, E, M> vertex)
     throws IOException {
     vertex.getId().write(output);
     ((OutEdges<I, E>) vertex.getEdges()).write(output);
@@ -402,7 +402,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     id.readFields(in);
     V value = conf.createVertexValue();
     value.readFields(in);
-    vertex.initialize(id, value);
+    OutEdges<I, E> edges = conf.createOutEdges();
+    vertex.initialize(id, value, edges);
     if (in.readBoolean()) {
       vertex.voteToHalt();
     } else {
@@ -417,16 +418,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
    * @param partition The partition owning the vertex
    * @throws IOException
    */
+  @SuppressWarnings("unchecked")
   private void readOutEdges(DataInput in, Partition<I, V, E, M> partition)
     throws IOException {
     I id = conf.createVertexId();
     id.readFields(in);
     Vertex<I, V, E, M> v = partition.getVertex(id);
-    OutEdges<I, E> edges = conf.createOutEdges();
-    edges.readFields(in);
-    v.setEdges(edges);
+    ((OutEdges<I, E>) v.getEdges()).readFields(in);
   }
 
+
   /**
    * Load a partition from disk. It deletes the files after the load,
    * except for the edges, if the graph is static.
@@ -441,22 +442,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     Partition<I, V, E, M> partition =
         conf.createPartition(id, context);
     File file = new File(getVerticesPath(id));
-    DataInputStream inputStream = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file)));
-    for (int i = 0; i < numVertices; ++i) {
-      Vertex<I, V , E, M> vertex = conf.createVertex();
-      readVertexData(inputStream, vertex);
-      partition.putVertex(vertex);
-    }
-    inputStream.close();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("loadPartition: loading partition vertices " +
+          partition.getId() + " from " + file.getAbsolutePath());
+    }
+    DataInputStream inputStream = null;
+    try {
+      inputStream = new DataInputStream(
+          new BufferedInputStream(new FileInputStream(file)));
+      for (int i = 0; i < numVertices; ++i) {
+        Vertex<I, V , E, M> vertex = conf.createVertex();
+        readVertexData(inputStream, vertex);
+        partition.putVertex(vertex);
+      }
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+        inputStream = null;
+      }
+    }
     file.delete();
     file = new File(getEdgesPath(id));
-    inputStream = new DataInputStream(
-        new BufferedInputStream(new FileInputStream(file)));
-    for (int i = 0; i < numVertices; ++i) {
-      readOutEdges(inputStream, partition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("loadPartition: loading partition edges " +
+          partition.getId() + " from " + file.getAbsolutePath());
+    }
+    try {
+      inputStream = new DataInputStream(
+          new BufferedInputStream(new FileInputStream(file)));
+      for (int i = 0; i < numVertices; ++i) {
+        readOutEdges(inputStream, partition);
+      }
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
     }
-    inputStream.close();
     /*
      * If the graph is static, keep the file around.
      */
@@ -477,16 +498,23 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     File file = new File(getVerticesPath(partition.getId()));
     file.getParentFile().mkdirs();
     file.createNewFile();
-    if (LOG.isInfoEnabled()) {
-      LOG.info("offloadPartition: writing partition vertices " +
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("offloadPartition: writing partition vertices " +
           partition.getId() + " to " + file.getAbsolutePath());
     }
-    DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      writeVertexData(outputStream, vertex);
+    DataOutputStream outputStream = null;
+    try {
+      outputStream = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file)));
+      for (Vertex<I, V, E, M> vertex : partition) {
+        writeVertexData(outputStream, vertex);
+      }
+    } finally {
+      if (outputStream != null) {
+        outputStream.close();
+        outputStream = null;
+      }
     }
-    outputStream.close();
     file = new File(getEdgesPath(partition.getId()));
     /*
      * Avoid writing back edges if we have already written them once and
@@ -494,16 +522,21 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
      */
     if (!conf.isStaticGraph() || !file.exists()) {
       file.createNewFile();
-      if (LOG.isInfoEnabled()) {
-        LOG.info("offloadPartition: writing partition edges " +
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("offloadPartition: writing partition edges " +
             partition.getId() + " to " + file.getAbsolutePath());
       }
-      outputStream = new DataOutputStream(
-          new BufferedOutputStream(new FileOutputStream(file)));
-      for (Vertex<I, V, E, M> vertex : partition) {
-        writeOutEdges(outputStream, vertex);
+      try {
+        outputStream = new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(file)));
+        for (Vertex<I, V, E, M> vertex : partition) {
+          writeOutEdges(outputStream, vertex);
+        }
+      } finally {
+        if (outputStream != null) {
+          outputStream.close();
+        }
       }
-      outputStream.close();
     }
   }
 
@@ -520,19 +553,31 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
     Integer count = onDisk.get(id);
     onDisk.put(id, count + (int) partition.getVertexCount());
     File file = new File(getVerticesPath(id));
-    DataOutputStream outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file, true)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      writeVertexData(outputStream, vertex);
+    DataOutputStream outputStream = null;
+    try {
+      outputStream = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file, true)));
+      for (Vertex<I, V, E, M> vertex : partition) {
+        writeVertexData(outputStream, vertex);
+      }
+    } finally {
+      if (outputStream != null) {
+        outputStream.close();
+        outputStream = null;
+      }
     }
-    outputStream.close();
     file = new File(getEdgesPath(id));
-    outputStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(file, true)));
-    for (Vertex<I, V, E, M> vertex : partition) {
-      writeOutEdges(outputStream, vertex);
+    try {
+      outputStream = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file, true)));
+      for (Vertex<I, V, E, M> vertex : partition) {
+        writeOutEdges(outputStream, vertex);
+      }
+    } finally {
+      if (outputStream != null) {
+        outputStream.close();
+      }
     }
-    outputStream.close();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 9bb8f71..5a93d41 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
@@ -135,6 +136,23 @@ public class TestPartitionStores {
     assertEquals(0, deserializatedPartition.getEdgeCount());
     assertEquals(7, deserializatedPartition.getVertexCount());
   }
+  
+  @Test
+  public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException {
+    File directory = Files.createTempDir();
+    GiraphConstants.PARTITIONS_DIRECTORY.set(
+        conf, new File(directory, "giraph_partitions").toString());
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    conf.setPartitionClass(ByteArrayPartition.class);
+    
+    PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+        partitionStore = new DiskBackedPartitionStore<IntWritable,
+            IntWritable, NullWritable, IntWritable>(conf, context);
+    testReadWrite(partitionStore, conf);
+    partitionStore.shutdown();
+    FileUtils.deleteDirectory(directory);
+  }
 
   @Test
   public void testDiskBackedPartitionStore() throws IOException {
@@ -190,6 +208,8 @@ public class TestPartitionStores {
     Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
         conf.createVertex();
     v7.initialize(new IntWritable(7), new IntWritable(7));
+    v7.addEdge(EdgeFactory.create(new IntWritable(1)));
+    v7.addEdge(EdgeFactory.create(new IntWritable(2)));
 
     partitionStore.addPartition(createPartition(conf, 1, v1, v2));
     partitionStore.addPartition(createPartition(conf, 2, v3));
@@ -219,18 +239,23 @@ public class TestPartitionStores {
       partitionStore.putPartition(p);
       partitionsNumber++;
     }
+    Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition;
     assertEquals(3, partitionsNumber);
     assertTrue(partitionStore.hasPartition(1));
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));
     assertTrue(partitionStore.hasPartition(4));
-    assertEquals(3, partition1.getVertexCount());
-    assertEquals(2, partition2.getVertexCount());
-    assertEquals(1, partition3.getVertexCount());
-    assertEquals(1, partition4.getVertexCount());
-
+    partition = partitionStore.getPartition(1);
+    assertEquals(3, partition.getVertexCount());
+    partitionStore.putPartition(partition);
+    partition = partitionStore.getPartition(2);
+    assertEquals(2, partition.getVertexCount());
+    partitionStore.putPartition(partition);
+    partition = partitionStore.getPartition(4);
+    assertEquals(1, partition.getVertexCount());
+    assertEquals(2, partition.getEdgeCount());
+    partitionStore.putPartition(partition);
     partitionStore.deletePartition(2);
-
     assertEquals(2, partitionStore.getNumPartitions());
   }
 }