You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2013/02/28 08:36:32 UTC
svn commit: r1451127 - in /hama/trunk: CHANGES.txt
graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
Author: tjungblut
Date: Thu Feb 28 07:36:32 2013
New Revision: 1451127
URL: http://svn.apache.org/r1451127
Log:
[HAMA-740]: DiskVerticesInfo are slow due to not buffered writes
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 28 07:36:32 2013
@@ -20,7 +20,8 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
- HAMA-704: Optimization of memory usage during message processing (tjungblut)
+ HAMA-740: DiskVerticesInfo are slow due to not buffered writes (tjungblut)
+ HAMA-704: Optimization of memory usage during message processing (tjungblut)
HAMA-735: Tighten the graph API (tjungblut)
HAMA-714: Align format consistency between examples and generators (edwardyoon)
HAMA-531: Reimplementation of partitioner (edwardyoon)
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Thu Feb 28 07:36:32 2013
@@ -20,12 +20,13 @@ package org.apache.hama.graph;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,9 +45,13 @@ public final class DiskVerticesInfo<V ex
private static final byte NULL = 0;
private static final byte NOT_NULL = 1;
- private RandomAccessFile staticGraphParts;
- private RandomAccessFile softGraphParts;
- private RandomAccessFile softGraphPartsNextIteration;
+ private FSDataOutputStream staticGraphPartsDos;
+ private FSDataInputStream staticGraphPartsDis;
+
+ private FSDataOutputStream softGraphPartsDos;
+ private FSDataInputStream softGraphPartsDis;
+
+ private FSDataOutputStream softGraphPartsNextIterationDos;
private BitSet activeVertices;
private long[] softValueOffsets;
@@ -64,6 +69,7 @@ public final class DiskVerticesInfo<V ex
private int index = 0;
private Configuration conf;
private GraphJobRunner<V, E, M> runner;
+ private String staticFile;
@Override
public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
@@ -77,19 +83,19 @@ public final class DiskVerticesInfo<V ex
+ "/";
LocalFileSystem local = FileSystem.getLocal(conf);
local.mkdirs(new Path(rootPath));
- // make sure that those files do not exist
- String staticFile = rootPath + "static.graph";
+ staticFile = rootPath + "static.graph";
local.delete(new Path(staticFile), false);
- staticGraphParts = new RandomAccessFile(staticFile, "rw");
+ staticGraphPartsDos = local.create(new Path(staticFile));
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
local.delete(new Path(softGraphFileName), false);
- softGraphParts = new RandomAccessFile(softGraphFileName, "rw");
+ softGraphPartsDos = local.create(new Path(softGraphFileName));
}
@Override
public void cleanup(Configuration conf, TaskAttemptID attempt)
throws IOException {
- IOUtils.cleanup(null, softGraphParts, softGraphPartsNextIteration);
+ IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
+ staticGraphPartsDis, softGraphPartsDis);
// delete the contents
FileSystem.getLocal(conf).delete(new Path(rootPath), true);
}
@@ -101,15 +107,15 @@ public final class DiskVerticesInfo<V ex
"Additions are locked now, nobody is allowed to change the structure anymore.");
// write the static parts
- tmpStaticOffsets.add(staticGraphParts.length());
- vertex.getVertexID().write(staticGraphParts);
- staticGraphParts.writeInt(vertex.getEdges() == null ? 0 : vertex.getEdges()
- .size());
+ tmpStaticOffsets.add(staticGraphPartsDos.getPos());
+ vertex.getVertexID().write(staticGraphPartsDos);
+ staticGraphPartsDos.writeInt(vertex.getEdges() == null ? 0 : vertex
+ .getEdges().size());
for (Edge<?, ?> e : vertex.getEdges()) {
- e.getDestinationVertexID().write(staticGraphParts);
+ e.getDestinationVertexID().write(staticGraphPartsDos);
}
- serializeSoft(vertex, -1, null, softGraphParts);
+ serializeSoft(vertex, -1, null, softGraphPartsDos);
size++;
}
@@ -120,15 +126,15 @@ public final class DiskVerticesInfo<V ex
* the temporary storage.
*/
private void serializeSoft(Vertex<V, E, M> vertex, int index,
- long[] softValueOffsets, RandomAccessFile softGraphParts)
+ long[] softValueOffsets, FSDataOutputStream softGraphParts)
throws IOException {
// safe offset write the soft parts
if (index >= 0) {
- softValueOffsets[index] = softGraphParts.length();
+ softValueOffsets[index] = softGraphParts.getPos();
// only set the bitset if we've finished the setup
activeVertices.set(index, vertex.isHalted());
} else {
- tmpSoftOffsets.add(softGraphParts.length());
+ tmpSoftOffsets.add(softGraphParts.getPos());
}
if (vertex.getValue() == null) {
softGraphParts.write(NULL);
@@ -158,7 +164,7 @@ public final class DiskVerticesInfo<V ex
tmpStaticOffsets = null;
tmpSoftOffsets = null;
-
+ IOUtils.cleanup(null, staticGraphPartsDos, softGraphPartsDos);
// prevent additional vertices from beeing added
lockedAdditions = true;
}
@@ -180,8 +186,9 @@ public final class DiskVerticesInfo<V ex
public void startSuperstep() throws IOException {
index = 0;
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
- FileSystem.getLocal(conf).delete(new Path(softGraphFileName), true);
- softGraphPartsNextIteration = new RandomAccessFile(softGraphFileName, "rw");
+ LocalFileSystem local = FileSystem.getLocal(conf);
+ local.delete(new Path(softGraphFileName), true);
+ softGraphPartsNextIterationDos = local.create(new Path(softGraphFileName));
softValueOffsets = softValueOffsetsNextIteration;
softValueOffsetsNextIteration = new long[softValueOffsetsNextIteration.length];
}
@@ -191,17 +198,20 @@ public final class DiskVerticesInfo<V ex
throws IOException {
// write to the soft parts
serializeSoft(vertex, index++, softValueOffsetsNextIteration,
- softGraphPartsNextIteration);
+ softGraphPartsNextIterationDos);
}
@Override
public void finishSuperstep() throws IOException {
// do not delete files in the first step
+ IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
+ softGraphPartsDis);
if (currentStep > 0) {
- softGraphParts.close();
- FileSystem.getLocal(conf).delete(
- new Path(getSoftGraphFileName(rootPath, currentStep - 1)), true);
- softGraphParts = softGraphPartsNextIteration;
+ LocalFileSystem local = FileSystem.getLocal(conf);
+ local.delete(new Path(getSoftGraphFileName(rootPath, currentStep - 1)),
+ true);
+ String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
+ softGraphPartsDis = local.open(new Path(softGraphFileName));
}
currentStep++;
}
@@ -238,8 +248,15 @@ public final class DiskVerticesInfo<V ex
public IDSkippingIterator<V, E, M> skippingIterator() {
try {
// reset
- staticGraphParts.seek(0);
- softGraphParts.seek(0);
+ String softGraphFileName = getSoftGraphFileName(rootPath,
+ Math.max(0, currentStep - 1));
+ LocalFileSystem local = FileSystem.getLocal(conf);
+ // close the files
+ IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsDis,
+ staticGraphPartsDis, staticGraphPartsDos);
+ softGraphPartsDis = local.open(new Path(softGraphFileName));
+ staticGraphPartsDis = local.open(new Path(staticFile));
+
// ensure the vertex is not null
if (cachedVertexInstance == null) {
cachedVertexInstance = GraphJobRunner
@@ -297,10 +314,10 @@ public final class DiskVerticesInfo<V ex
try {
while (true) {
// seek until we found something that satisfied our strategy
- staticGraphParts.seek(staticOffsets[index]);
+ staticGraphPartsDis.seek(staticOffsets[index]);
boolean halted = activeVertices.get(index);
cachedVertexInstance.setVotedToHalt(halted);
- cachedVertexInstance.getVertexID().readFields(staticGraphParts);
+ cachedVertexInstance.getVertexID().readFields(staticGraphPartsDis);
if (strat.accept(cachedVertexInstance, messageVertexId)) {
break;
}
@@ -308,20 +325,20 @@ public final class DiskVerticesInfo<V ex
return size;
}
}
- softGraphParts.seek(softValueOffsets[index]);
+ softGraphPartsDis.seek(softValueOffsets[index]);
// setting vertex value null here, because it may be overridden. Messaging
// is not materializing the message directly- so it is possible for the
// read fields method to change this object (thus a new object).
cachedVertexInstance.setValue(null);
- if (softGraphParts.readByte() == NOT_NULL) {
+ if (softGraphPartsDis.readByte() == NOT_NULL) {
ensureVertexValueNotNull();
- cachedVertexInstance.getValue().readFields(softGraphParts);
+ cachedVertexInstance.getValue().readFields(softGraphPartsDis);
}
- cachedVertexInstance.readState(softGraphParts);
- int numEdges = staticGraphParts.readInt();
- int softEdges = softGraphParts.readInt();
+ cachedVertexInstance.readState(softGraphPartsDis);
+ int numEdges = staticGraphPartsDis.readInt();
+ int softEdges = softGraphPartsDis.readInt();
if (softEdges != numEdges) {
throw new IllegalArgumentException(
"Number of edges seemed to change. This is not possible (yet).");
@@ -335,10 +352,10 @@ public final class DiskVerticesInfo<V ex
Edge<V, E> edge = new Edge<V, E>();
ensureEdgeValueNotNull(edge);
ensureEdgeIDNotNull(edge);
- edge.getDestinationVertexID().readFields(staticGraphParts);
- if (softGraphParts.readByte() == NOT_NULL) {
+ edge.getDestinationVertexID().readFields(staticGraphPartsDis);
+ if (softGraphPartsDis.readByte() == NOT_NULL) {
ensureEdgeValueNotNull(edge);
- edge.getCost().readFields(softGraphParts);
+ edge.getCost().readFields(softGraphPartsDis);
} else {
edge.setCost(null);
}
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1451127&r1=1451126&r2=1451127&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java Thu Feb 28 07:36:32 2013
@@ -64,10 +64,13 @@ public class TestDiskVerticesInfo extend
}
info.finishAdditions();
+ // implicitly finish the superstep here as the new softfile must be
+ // generated, the currentStep must be incremented etc.
+ info.finishSuperstep();
assertEquals(10, info.size());
// no we want to iterate and check if the result can properly be obtained
-
+ info.startSuperstep();
int index = 0;
IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
.skippingIterator();
@@ -89,14 +92,14 @@ public class TestDiskVerticesInfo extend
assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
.toString(), edge.getDestinationVertexID().toString());
assertNull(edge.getValue());
-
+ info.finishVertexComputation(next);
index++;
}
assertEquals(index, list.size());
info.finishSuperstep();
// iterate again and compute so vertices change internally
- iterator = info.skippingIterator();
info.startSuperstep();
+ iterator = info.skippingIterator();
while (iterator.hasNext()) {
Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
// override everything with constant 2