You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/28 08:36:32 UTC

svn commit: r1451127 - in /hama/trunk: CHANGES.txt graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java

Author: tjungblut
Date: Thu Feb 28 07:36:32 2013
New Revision: 1451127

URL: http://svn.apache.org/r1451127
Log:
[HAMA-740]: DiskVerticesInfo are slow due to not buffered writes

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 28 07:36:32 2013
@@ -20,7 +20,8 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
 
-   HAMA-704:  Optimization of memory usage during message processing (tjungblut)
+   HAMA-740: DiskVerticesInfo are slow due to not buffered writes (tjungblut)
+   HAMA-704: Optimization of memory usage during message processing (tjungblut)
    HAMA-735: Tighten the graph API (tjungblut)
    HAMA-714: Align format consistency between examples and generators (edwardyoon)
    HAMA-531: Reimplementation of partitioner (edwardyoon)

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Thu Feb 28 07:36:32 2013
@@ -20,12 +20,13 @@ package org.apache.hama.graph;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,9 +45,13 @@ public final class DiskVerticesInfo<V ex
   private static final byte NULL = 0;
   private static final byte NOT_NULL = 1;
 
-  private RandomAccessFile staticGraphParts;
-  private RandomAccessFile softGraphParts;
-  private RandomAccessFile softGraphPartsNextIteration;
+  private FSDataOutputStream staticGraphPartsDos;
+  private FSDataInputStream staticGraphPartsDis;
+
+  private FSDataOutputStream softGraphPartsDos;
+  private FSDataInputStream softGraphPartsDis;
+
+  private FSDataOutputStream softGraphPartsNextIterationDos;
 
   private BitSet activeVertices;
   private long[] softValueOffsets;
@@ -64,6 +69,7 @@ public final class DiskVerticesInfo<V ex
   private int index = 0;
   private Configuration conf;
   private GraphJobRunner<V, E, M> runner;
+  private String staticFile;
 
   @Override
   public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
@@ -77,19 +83,19 @@ public final class DiskVerticesInfo<V ex
         + "/";
     LocalFileSystem local = FileSystem.getLocal(conf);
     local.mkdirs(new Path(rootPath));
-    // make sure that those files do not exist
-    String staticFile = rootPath + "static.graph";
+    staticFile = rootPath + "static.graph";
     local.delete(new Path(staticFile), false);
-    staticGraphParts = new RandomAccessFile(staticFile, "rw");
+    staticGraphPartsDos = local.create(new Path(staticFile));
     String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
     local.delete(new Path(softGraphFileName), false);
-    softGraphParts = new RandomAccessFile(softGraphFileName, "rw");
+    softGraphPartsDos = local.create(new Path(softGraphFileName));
   }
 
   @Override
   public void cleanup(Configuration conf, TaskAttemptID attempt)
       throws IOException {
-    IOUtils.cleanup(null, softGraphParts, softGraphPartsNextIteration);
+    IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
+        staticGraphPartsDis, softGraphPartsDis);
     // delete the contents
     FileSystem.getLocal(conf).delete(new Path(rootPath), true);
   }
@@ -101,15 +107,15 @@ public final class DiskVerticesInfo<V ex
         "Additions are locked now, nobody is allowed to change the structure anymore.");
 
     // write the static parts
-    tmpStaticOffsets.add(staticGraphParts.length());
-    vertex.getVertexID().write(staticGraphParts);
-    staticGraphParts.writeInt(vertex.getEdges() == null ? 0 : vertex.getEdges()
-        .size());
+    tmpStaticOffsets.add(staticGraphPartsDos.getPos());
+    vertex.getVertexID().write(staticGraphPartsDos);
+    staticGraphPartsDos.writeInt(vertex.getEdges() == null ? 0 : vertex
+        .getEdges().size());
     for (Edge<?, ?> e : vertex.getEdges()) {
-      e.getDestinationVertexID().write(staticGraphParts);
+      e.getDestinationVertexID().write(staticGraphPartsDos);
     }
 
-    serializeSoft(vertex, -1, null, softGraphParts);
+    serializeSoft(vertex, -1, null, softGraphPartsDos);
 
     size++;
   }
@@ -120,15 +126,15 @@ public final class DiskVerticesInfo<V ex
    * the temporary storage.
    */
   private void serializeSoft(Vertex<V, E, M> vertex, int index,
-      long[] softValueOffsets, RandomAccessFile softGraphParts)
+      long[] softValueOffsets, FSDataOutputStream softGraphParts)
       throws IOException {
     // safe offset write the soft parts
     if (index >= 0) {
-      softValueOffsets[index] = softGraphParts.length();
+      softValueOffsets[index] = softGraphParts.getPos();
       // only set the bitset if we've finished the setup
       activeVertices.set(index, vertex.isHalted());
     } else {
-      tmpSoftOffsets.add(softGraphParts.length());
+      tmpSoftOffsets.add(softGraphParts.getPos());
     }
     if (vertex.getValue() == null) {
       softGraphParts.write(NULL);
@@ -158,7 +164,7 @@ public final class DiskVerticesInfo<V ex
 
     tmpStaticOffsets = null;
     tmpSoftOffsets = null;
-
+    IOUtils.cleanup(null, staticGraphPartsDos, softGraphPartsDos);
     // prevent additional vertices from beeing added
     lockedAdditions = true;
   }
@@ -180,8 +186,9 @@ public final class DiskVerticesInfo<V ex
   public void startSuperstep() throws IOException {
     index = 0;
     String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
-    FileSystem.getLocal(conf).delete(new Path(softGraphFileName), true);
-    softGraphPartsNextIteration = new RandomAccessFile(softGraphFileName, "rw");
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    local.delete(new Path(softGraphFileName), true);
+    softGraphPartsNextIterationDos = local.create(new Path(softGraphFileName));
     softValueOffsets = softValueOffsetsNextIteration;
     softValueOffsetsNextIteration = new long[softValueOffsetsNextIteration.length];
   }
@@ -191,17 +198,20 @@ public final class DiskVerticesInfo<V ex
       throws IOException {
     // write to the soft parts
     serializeSoft(vertex, index++, softValueOffsetsNextIteration,
-        softGraphPartsNextIteration);
+        softGraphPartsNextIterationDos);
   }
 
   @Override
   public void finishSuperstep() throws IOException {
     // do not delete files in the first step
+    IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
+        softGraphPartsDis);
     if (currentStep > 0) {
-      softGraphParts.close();
-      FileSystem.getLocal(conf).delete(
-          new Path(getSoftGraphFileName(rootPath, currentStep - 1)), true);
-      softGraphParts = softGraphPartsNextIteration;
+      LocalFileSystem local = FileSystem.getLocal(conf);
+      local.delete(new Path(getSoftGraphFileName(rootPath, currentStep - 1)),
+          true);
+      String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
+      softGraphPartsDis = local.open(new Path(softGraphFileName));
     }
     currentStep++;
   }
@@ -238,8 +248,15 @@ public final class DiskVerticesInfo<V ex
   public IDSkippingIterator<V, E, M> skippingIterator() {
     try {
       // reset
-      staticGraphParts.seek(0);
-      softGraphParts.seek(0);
+      String softGraphFileName = getSoftGraphFileName(rootPath,
+          Math.max(0, currentStep - 1));
+      LocalFileSystem local = FileSystem.getLocal(conf);
+      // close the files
+      IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsDis,
+          staticGraphPartsDis, staticGraphPartsDos);
+      softGraphPartsDis = local.open(new Path(softGraphFileName));
+      staticGraphPartsDis = local.open(new Path(staticFile));
+
       // ensure the vertex is not null
       if (cachedVertexInstance == null) {
         cachedVertexInstance = GraphJobRunner
@@ -297,10 +314,10 @@ public final class DiskVerticesInfo<V ex
     try {
       while (true) {
         // seek until we found something that satisfied our strategy
-        staticGraphParts.seek(staticOffsets[index]);
+        staticGraphPartsDis.seek(staticOffsets[index]);
         boolean halted = activeVertices.get(index);
         cachedVertexInstance.setVotedToHalt(halted);
-        cachedVertexInstance.getVertexID().readFields(staticGraphParts);
+        cachedVertexInstance.getVertexID().readFields(staticGraphPartsDis);
         if (strat.accept(cachedVertexInstance, messageVertexId)) {
           break;
         }
@@ -308,20 +325,20 @@ public final class DiskVerticesInfo<V ex
           return size;
         }
       }
-      softGraphParts.seek(softValueOffsets[index]);
+      softGraphPartsDis.seek(softValueOffsets[index]);
 
       // setting vertex value null here, because it may be overridden. Messaging
       // is not materializing the message directly- so it is possible for the
       // read fields method to change this object (thus a new object).
       cachedVertexInstance.setValue(null);
-      if (softGraphParts.readByte() == NOT_NULL) {
+      if (softGraphPartsDis.readByte() == NOT_NULL) {
         ensureVertexValueNotNull();
-        cachedVertexInstance.getValue().readFields(softGraphParts);
+        cachedVertexInstance.getValue().readFields(softGraphPartsDis);
       }
 
-      cachedVertexInstance.readState(softGraphParts);
-      int numEdges = staticGraphParts.readInt();
-      int softEdges = softGraphParts.readInt();
+      cachedVertexInstance.readState(softGraphPartsDis);
+      int numEdges = staticGraphPartsDis.readInt();
+      int softEdges = softGraphPartsDis.readInt();
       if (softEdges != numEdges) {
         throw new IllegalArgumentException(
             "Number of edges seemed to change. This is not possible (yet).");
@@ -335,10 +352,10 @@ public final class DiskVerticesInfo<V ex
         Edge<V, E> edge = new Edge<V, E>();
         ensureEdgeValueNotNull(edge);
         ensureEdgeIDNotNull(edge);
-        edge.getDestinationVertexID().readFields(staticGraphParts);
-        if (softGraphParts.readByte() == NOT_NULL) {
+        edge.getDestinationVertexID().readFields(staticGraphPartsDis);
+        if (softGraphPartsDis.readByte() == NOT_NULL) {
           ensureEdgeValueNotNull(edge);
-          edge.getCost().readFields(softGraphParts);
+          edge.getCost().readFields(softGraphPartsDis);
         } else {
           edge.setCost(null);
         }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java Thu Feb 28 07:36:32 2013
@@ -64,10 +64,13 @@ public class TestDiskVerticesInfo extend
       }
 
       info.finishAdditions();
+      // implicitly finish the superstep here as the new softfile must be
+      // generated, the currentStep must be incremented etc.
+      info.finishSuperstep();
 
       assertEquals(10, info.size());
       // no we want to iterate and check if the result can properly be obtained
-
+      info.startSuperstep();
       int index = 0;
       IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
           .skippingIterator();
@@ -89,14 +92,14 @@ public class TestDiskVerticesInfo extend
         assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
             .toString(), edge.getDestinationVertexID().toString());
         assertNull(edge.getValue());
-
+        info.finishVertexComputation(next);
         index++;
       }
       assertEquals(index, list.size());
       info.finishSuperstep();
       // iterate again and compute so vertices change internally
-      iterator = info.skippingIterator();
       info.startSuperstep();
+      iterator = info.skippingIterator();
       while (iterator.hasNext()) {
         Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
         // override everything with constant 2