You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/04/19 18:34:48 UTC
git commit: updated refs/heads/trunk to c1b8840
Updated Branches:
refs/heads/trunk 75b311266 -> c1b88405c
GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with
DiskBackedPartitionStore (claudio)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1b88405
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1b88405
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1b88405
Branch: refs/heads/trunk
Commit: c1b88405c96db8203641c2baae8ff30527bb2b46
Parents: 75b3112
Author: Claudio Martella <cl...@apache.org>
Authored: Fri Apr 19 18:33:54 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Fri Apr 19 18:33:54 2013 +0200
----------------------------------------------------------------------
CHANGELOG | 3 +
.../giraph/partition/DiskBackedPartitionStore.java | 143 ++++++++++-----
.../giraph/partition/TestPartitionStores.java | 37 +++-
3 files changed, 128 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d7679a2..5a980cf 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 1.0.1 - unreleased
+ GIRAPH-624: ByteArrayPartition reports 0 aggregate edges when used with
+ DiskBackedPartitionStore (claudio)
+
GIRAPH-636: Initialize compute OutEdges directly from input OutEdges
(majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 2d30bf9..a4739f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -143,6 +143,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
for (String path : userPaths) {
basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("DiskBackedPartitionStore with maxInMemoryPartitions=" +
+ maxInMemoryPartitions + ", isStaticGraph=" + conf.isStaticGraph());
+ }
}
@Override
@@ -358,15 +362,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Writes vertex data (Id, Vertex Value and halted state) to stream.
+ * Writes vertex data (Id, value and halted state) to stream.
*
* @param output The output stream
* @param vertex The vertex to serialize
* @throws IOException
*/
- private void writeVertexData(
- DataOutput output,
- Vertex<I, V, E, M> vertex)
+ private void writeVertexData(DataOutput output, Vertex<I, V, E, M> vertex)
throws IOException {
vertex.getId().write(output);
vertex.getValue().write(output);
@@ -374,16 +376,14 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Writes vertex edges (Id, Edges) to stream.
+ * Writes vertex edges (Id, edges) to stream.
*
* @param output The output stream
* @param vertex The vertex to serialize
* @throws IOException
*/
@SuppressWarnings("unchecked")
- private void writeOutEdges(
- DataOutput output,
- Vertex<I, V, E, M> vertex)
+ private void writeOutEdges(DataOutput output, Vertex<I, V, E, M> vertex)
throws IOException {
vertex.getId().write(output);
((OutEdges<I, E>) vertex.getEdges()).write(output);
@@ -402,7 +402,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
id.readFields(in);
V value = conf.createVertexValue();
value.readFields(in);
- vertex.initialize(id, value);
+ OutEdges<I, E> edges = conf.createOutEdges();
+ vertex.initialize(id, value, edges);
if (in.readBoolean()) {
vertex.voteToHalt();
} else {
@@ -417,16 +418,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
* @param partition The partition owning the vertex
* @throws IOException
*/
+ @SuppressWarnings("unchecked")
private void readOutEdges(DataInput in, Partition<I, V, E, M> partition)
throws IOException {
I id = conf.createVertexId();
id.readFields(in);
Vertex<I, V, E, M> v = partition.getVertex(id);
- OutEdges<I, E> edges = conf.createOutEdges();
- edges.readFields(in);
- v.setEdges(edges);
+ ((OutEdges<I, E>) v.getEdges()).readFields(in);
}
+
/**
* Load a partition from disk. It deletes the files after the load,
* except for the edges, if the graph is static.
@@ -441,22 +442,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
Partition<I, V, E, M> partition =
conf.createPartition(id, context);
File file = new File(getVerticesPath(id));
- DataInputStream inputStream = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file)));
- for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V , E, M> vertex = conf.createVertex();
- readVertexData(inputStream, vertex);
- partition.putVertex(vertex);
- }
- inputStream.close();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loadPartition: loading partition vertices " +
+ partition.getId() + " from " + file.getAbsolutePath());
+ }
+ DataInputStream inputStream = null;
+ try {
+ inputStream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file)));
+ for (int i = 0; i < numVertices; ++i) {
+ Vertex<I, V , E, M> vertex = conf.createVertex();
+ readVertexData(inputStream, vertex);
+ partition.putVertex(vertex);
+ }
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ }
file.delete();
file = new File(getEdgesPath(id));
- inputStream = new DataInputStream(
- new BufferedInputStream(new FileInputStream(file)));
- for (int i = 0; i < numVertices; ++i) {
- readOutEdges(inputStream, partition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loadPartition: loading partition edges " +
+ partition.getId() + " from " + file.getAbsolutePath());
+ }
+ try {
+ inputStream = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(file)));
+ for (int i = 0; i < numVertices; ++i) {
+ readOutEdges(inputStream, partition);
+ }
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
}
- inputStream.close();
/*
* If the graph is static, keep the file around.
*/
@@ -477,16 +498,23 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
File file = new File(getVerticesPath(partition.getId()));
file.getParentFile().mkdirs();
file.createNewFile();
- if (LOG.isInfoEnabled()) {
- LOG.info("offloadPartition: writing partition vertices " +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("offloadPartition: writing partition vertices " +
partition.getId() + " to " + file.getAbsolutePath());
}
- DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file)));
- for (Vertex<I, V, E, M> vertex : partition) {
- writeVertexData(outputStream, vertex);
+ DataOutputStream outputStream = null;
+ try {
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeVertexData(outputStream, vertex);
+ }
+ } finally {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
}
- outputStream.close();
file = new File(getEdgesPath(partition.getId()));
/*
* Avoid writing back edges if we have already written them once and
@@ -494,16 +522,21 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
*/
if (!conf.isStaticGraph() || !file.exists()) {
file.createNewFile();
- if (LOG.isInfoEnabled()) {
- LOG.info("offloadPartition: writing partition edges " +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("offloadPartition: writing partition edges " +
partition.getId() + " to " + file.getAbsolutePath());
}
- outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file)));
- for (Vertex<I, V, E, M> vertex : partition) {
- writeOutEdges(outputStream, vertex);
+ try {
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeOutEdges(outputStream, vertex);
+ }
+ } finally {
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
- outputStream.close();
}
}
@@ -520,19 +553,31 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
Integer count = onDisk.get(id);
onDisk.put(id, count + (int) partition.getVertexCount());
File file = new File(getVerticesPath(id));
- DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file, true)));
- for (Vertex<I, V, E, M> vertex : partition) {
- writeVertexData(outputStream, vertex);
+ DataOutputStream outputStream = null;
+ try {
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file, true)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeVertexData(outputStream, vertex);
+ }
+ } finally {
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
}
- outputStream.close();
file = new File(getEdgesPath(id));
- outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file, true)));
- for (Vertex<I, V, E, M> vertex : partition) {
- writeOutEdges(outputStream, vertex);
+ try {
+ outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file, true)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ writeOutEdges(outputStream, vertex);
+ }
+ } finally {
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
- outputStream.close();
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/c1b88405/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 9bb8f71..5a93d41 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
@@ -135,6 +136,23 @@ public class TestPartitionStores {
assertEquals(0, deserializatedPartition.getEdgeCount());
assertEquals(7, deserializatedPartition.getVertexCount());
}
+
+ @Test
+ public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException {
+ File directory = Files.createTempDir();
+ GiraphConstants.PARTITIONS_DIRECTORY.set(
+ conf, new File(directory, "giraph_partitions").toString());
+ GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+ GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+ conf.setPartitionClass(ByteArrayPartition.class);
+
+ PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
+ partitionStore = new DiskBackedPartitionStore<IntWritable,
+ IntWritable, NullWritable, IntWritable>(conf, context);
+ testReadWrite(partitionStore, conf);
+ partitionStore.shutdown();
+ FileUtils.deleteDirectory(directory);
+ }
@Test
public void testDiskBackedPartitionStore() throws IOException {
@@ -190,6 +208,8 @@ public class TestPartitionStores {
Vertex<IntWritable, IntWritable, NullWritable, IntWritable> v7 =
conf.createVertex();
v7.initialize(new IntWritable(7), new IntWritable(7));
+ v7.addEdge(EdgeFactory.create(new IntWritable(1)));
+ v7.addEdge(EdgeFactory.create(new IntWritable(2)));
partitionStore.addPartition(createPartition(conf, 1, v1, v2));
partitionStore.addPartition(createPartition(conf, 2, v3));
@@ -219,18 +239,23 @@ public class TestPartitionStores {
partitionStore.putPartition(p);
partitionsNumber++;
}
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition;
assertEquals(3, partitionsNumber);
assertTrue(partitionStore.hasPartition(1));
assertTrue(partitionStore.hasPartition(2));
assertFalse(partitionStore.hasPartition(3));
assertTrue(partitionStore.hasPartition(4));
- assertEquals(3, partition1.getVertexCount());
- assertEquals(2, partition2.getVertexCount());
- assertEquals(1, partition3.getVertexCount());
- assertEquals(1, partition4.getVertexCount());
-
+ partition = partitionStore.getPartition(1);
+ assertEquals(3, partition.getVertexCount());
+ partitionStore.putPartition(partition);
+ partition = partitionStore.getPartition(2);
+ assertEquals(2, partition.getVertexCount());
+ partitionStore.putPartition(partition);
+ partition = partitionStore.getPartition(4);
+ assertEquals(1, partition.getVertexCount());
+ assertEquals(2, partition.getEdgeCount());
+ partitionStore.putPartition(partition);
partitionStore.deletePartition(2);
-
assertEquals(2, partitionStore.getNumPartitions());
}
}