You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/04/11 22:05:16 UTC
[1/2] git commit: updated refs/heads/trunk to 67e0f11
Updated Branches:
refs/heads/trunk 96fd05385 -> 67e0f11d2
GIRAPH-613
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6cf79dbe
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6cf79dbe
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6cf79dbe
Branch: refs/heads/trunk
Commit: 6cf79dbe2c4397732820c435df723fe50e9f3daf
Parents: 96fd053
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Apr 11 18:09:25 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Apr 11 18:09:25 2013 +0200
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/conf/GiraphClasses.java | 2 +-
.../main/java/org/apache/giraph/graph/Vertex.java | 25 +--
.../org/apache/giraph/graph/VertexMutations.java | 6 +-
.../giraph/partition/ByteArrayPartition.java | 31 ++--
.../giraph/partition/DiskBackedPartitionStore.java | 9 +-
.../apache/giraph/partition/SimplePartition.java | 9 +-
.../org/apache/giraph/utils/WritableUtils.java | 182 +++++++++++++++
.../apache/giraph/graph/TestVertexAndEdges.java | 24 ++-
9 files changed, 231 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index babbb88..4a1e7ca 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-613: Remove Writable from the interfaces implemented by Vertex (claudio)
+
GIRAPH-543: Fix PageRankBenchmark and make WeightedPageRankBenchmark (majakabiljo)
GIRAPH-615: Add support for multithreaded output (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 64f8bb1..95499bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -134,7 +134,7 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
- * Contructor that reads classes from a Configuration object.
+ * Constructor that reads classes from a Configuration object.
*
* @param conf Configuration object to read from.
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index fda6023..a1b1a87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -35,13 +35,13 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
/**
* Basic abstract class for writing a BSP application for computation.
+ * Giraph will checkpoint Vertex value and edges, hence all user data should
+ * be stored as part of the vertex value.
*
* @param <I> Vertex id
* @param <V> Vertex data
@@ -51,7 +51,7 @@ import java.util.Iterator;
public abstract class Vertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E, M>
- implements WorkerAggregatorUsage, Writable {
+ implements WorkerAggregatorUsage {
/** Vertex id. */
private I id;
/** Vertex value. */
@@ -507,25 +507,6 @@ public abstract class Vertex<I extends WritableComparable,
}
@Override
- public void readFields(DataInput in) throws IOException {
- id = getConf().createVertexId();
- id.readFields(in);
- value = getConf().createVertexValue();
- value.readFields(in);
- edges = getConf().createVertexEdges();
- edges.readFields(in);
- halt = in.readBoolean();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- id.write(out);
- value.write(out);
- edges.write(out);
- out.writeBoolean(halt);
- }
-
- @Override
public String toString() {
return "Vertex(id=" + getId() + ",value=" + getValue() +
",#edges=" + getNumEdges() + ")";
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index ea50f25..75c0aef 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -87,8 +87,8 @@ public class VertexMutations<I extends WritableComparable,
int addedVertexListSize = input.readInt();
for (int i = 0; i < addedVertexListSize; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- vertex.readFields(input);
+ Vertex<I, V, E, M> vertex =
+ WritableUtils.readVertexFromDataInput(input, getConf());
addedVertexList.add(vertex);
}
removedVertexCount = input.readInt();
@@ -110,7 +110,7 @@ public class VertexMutations<I extends WritableComparable,
public void write(DataOutput output) throws IOException {
output.writeInt(addedVertexList.size());
for (Vertex<I, V, E, M> vertex : addedVertexList) {
- vertex.write(output);
+ WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
}
output.writeInt(removedVertexCount);
output.writeInt(addedEdgeList.size());
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index dd8c974..d2e7599 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -79,21 +79,22 @@ public class ByteArrayPartition<I extends WritableComparable,
if (vertexData == null) {
return null;
}
- WritableUtils.readFieldsFromByteArrayWithSize(
- vertexData, representativeVertex, useUnsafeSerialization);
+ WritableUtils.reinitializeVertexFromByteArray(
+ vertexData, representativeVertex, useUnsafeSerialization, getConf());
return representativeVertex;
}
@Override
public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
byte[] vertexData =
- WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+ WritableUtils.writeVertexToByteArray(
+ vertex, useUnsafeSerialization, getConf());
byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
if (oldVertexBytes == null) {
return null;
} else {
- WritableUtils.readFieldsFromByteArrayWithSize(
- oldVertexBytes, representativeVertex, useUnsafeSerialization);
+ WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
+ representativeVertex, useUnsafeSerialization, getConf());
return representativeVertex;
}
}
@@ -104,8 +105,8 @@ public class ByteArrayPartition<I extends WritableComparable,
if (vertexBytes == null) {
return null;
}
- WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
- representativeVertex, useUnsafeSerialization);
+ WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
+ representativeVertex, useUnsafeSerialization, getConf());
return representativeVertex;
}
@@ -134,8 +135,8 @@ public class ByteArrayPartition<I extends WritableComparable,
public long getEdgeCount() {
long edges = 0;
for (byte[] vertexBytes : vertexMap.values()) {
- WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
- representativeVertex, useUnsafeSerialization);
+ WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
+ representativeVertex, useUnsafeSerialization, getConf());
edges += representativeVertex.getNumEdges();
}
return edges;
@@ -147,12 +148,12 @@ public class ByteArrayPartition<I extends WritableComparable,
byte[] oldVertexData = vertexMap.get(vertex.getId());
if (oldVertexData != null) {
vertexMap.put(vertex.getId(),
- WritableUtils.writeToByteArrayWithSize(
- vertex, oldVertexData, useUnsafeSerialization));
+ WritableUtils.writeVertexToByteArray(
+ vertex, oldVertexData, useUnsafeSerialization, getConf()));
} else {
vertexMap.put(vertex.getId(),
- WritableUtils.writeToByteArrayWithSize(
- vertex, useUnsafeSerialization));
+ WritableUtils.writeVertexToByteArray(
+ vertex, useUnsafeSerialization, getConf()));
}
}
@@ -223,9 +224,9 @@ public class ByteArrayPartition<I extends WritableComparable,
@Override
public Vertex<I, V, E, M> next() {
- WritableUtils.readFieldsFromByteArrayWithSize(
+ WritableUtils.reinitializeVertexFromByteArray(
vertexDataIterator.next(), representativeVertex,
- useUnsafeSerialization);
+ useUnsafeSerialization, getConf());
return representativeVertex;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 3525302..11e0a90 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -20,6 +20,7 @@ package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -370,8 +371,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- vertex.readFields(inputStream);
+ Vertex<I, V , E, M> vertex =
+ WritableUtils.readVertexFromDataInput(inputStream, conf);
partition.putVertex(vertex);
}
inputStream.close();
@@ -397,7 +398,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
- vertex.write(outputStream);
+ WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
}
outputStream.close();
}
@@ -418,7 +419,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
DataOutputStream outputStream = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file, true)));
for (Vertex<I, V, E, M> vertex : partition) {
- vertex.write(outputStream);
+ WritableUtils.writeVertexToDataOutput(outputStream, vertex, conf);
}
outputStream.close();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 23e0f05..d6a46bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,6 +19,7 @@
package org.apache.giraph.partition;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
@@ -121,9 +122,9 @@ public class SimplePartition<I extends WritableComparable,
}
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
- Vertex<I, V, E, M> vertex = getConf().createVertex();
progress();
- vertex.readFields(input);
+ Vertex<I, V, E, M> vertex =
+ WritableUtils.readVertexFromDataInput(input, getConf());
if (vertexMap.put(vertex.getId(), vertex) != null) {
throw new IllegalStateException(
"readFields: " + this +
@@ -136,9 +137,9 @@ public class SimplePartition<I extends WritableComparable,
public void write(DataOutput output) throws IOException {
super.write(output);
output.writeInt(vertexMap.size());
- for (Vertex vertex : vertexMap.values()) {
+ for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
progress();
- vertex.write(output);
+ WritableUtils.writeVertexToDataOutput(output, vertex, getConf());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 6e7b87a..e3d79f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,7 +18,10 @@
package org.apache.giraph.utils;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperExt.PathStat;
import org.apache.hadoop.conf.Configuration;
@@ -328,6 +331,103 @@ public class WritableUtils {
}
/**
+ * Write vertex data to byte array with the first 4 bytes as the size of the
+ * entire buffer (including the size).
+ *
+ * @param vertex Vertex to write from.
+ * @param buffer Use this buffer instead
+ * @param unsafe Use unsafe serialization?
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @return Byte array with serialized object.
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
+ Vertex<I, V, E, M> vertex,
+ byte[] buffer,
+ boolean unsafe,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ExtendedDataOutput extendedDataOutput;
+ if (unsafe) {
+ extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
+ } else {
+ extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
+ }
+ try {
+ extendedDataOutput.writeInt(-1);
+ writeVertexToDataOutput(extendedDataOutput, vertex, conf);
+ extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
+ } catch (IOException e) {
+ throw new IllegalStateException("writeVertexToByteArray: " +
+ "IOException", e);
+ }
+
+ return extendedDataOutput.getByteArray();
+ }
+
+ /**
+ * Write vertex data to byte array with the first 4 bytes as the size of the
+ * entire buffer (including the size).
+ *
+ * @param vertex Vertex to write from.
+ * @param unsafe Use unsafe serialization?
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @return Byte array with serialized object.
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> byte[] writeVertexToByteArray(
+ Vertex<I, V, E, M> vertex,
+ boolean unsafe,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ return writeVertexToByteArray(vertex, null, unsafe, conf);
+ }
+
+ /**
+ * Read vertex data from byteArray to a Writeable object, skipping the size.
+ * Serialization method is choosable. Assumes the vertex has already been
+ * initialized and contains values for Id, value, and edges.
+ *
+ * @param byteArray Byte array to find the fields in.
+ * @param vertex Vertex to fill in the fields.
+ * @param unsafe Use unsafe deserialization
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @param conf Configuration
+ * @return The vertex
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> Vertex<I, V, E, M>
+ reinitializeVertexFromByteArray(
+ byte[] byteArray,
+ Vertex<I, V, E, M> vertex,
+ boolean unsafe,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ ExtendedDataInput extendedDataInput;
+ if (unsafe) {
+ extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
+ } else {
+ extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
+ }
+ try {
+ extendedDataInput.readInt();
+ reinitializeVertexFromDataInput(extendedDataInput, vertex, conf);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "readFieldsFromByteArrayWithSize: IOException", e);
+ }
+ return vertex;
+ }
+
+ /**
* Write an edge to an output stream.
*
* @param out Data output
@@ -356,4 +456,86 @@ public class WritableUtils {
edge.getTargetVertexId().readFields(in);
edge.getValue().readFields(in);
}
+
+ /**
+ * Reads data from input stream to inizialize Vertex. Assumes the vertex has
+ * already been initialized and contains values for Id, value, and edges.
+ *
+ * @param input The input stream
+ * @param vertex The vertex to initialize
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> void reinitializeVertexFromDataInput(
+ DataInput input,
+ Vertex<I, V, E, M> vertex,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ throws IOException {
+ vertex.getId().readFields(input);
+ vertex.getValue().readFields(input);
+ ((VertexEdges<I, E>) vertex.getEdges()).readFields(input);
+ if (input.readBoolean()) {
+ vertex.voteToHalt();
+ } else {
+ vertex.wakeUp();
+ }
+ }
+
+ /**
+ * Reads data from input stream to inizialize Vertex.
+ *
+ * @param input The input stream
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @return The vertex
+ * @throws IOException
+ */
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> Vertex<I, V, E, M>
+ readVertexFromDataInput(
+ DataInput input,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ throws IOException {
+ Vertex<I, V, E, M> vertex = conf.createVertex();
+ I id = conf.createVertexId();
+ V value = conf.createVertexValue();
+ VertexEdges<I, E> edges = conf.createVertexEdges();
+ vertex.initialize(id, value, edges);
+ reinitializeVertexFromDataInput(input, vertex, conf);
+ return vertex;
+ }
+
+ /**
+ * Writes Vertex data to output stream.
+ *
+ * @param output the output stream
+ * @param vertex The vertex to serialize
+ * @param conf Configuration
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> void writeVertexToDataOutput(
+ DataOutput output,
+ Vertex<I, V, E, M> vertex,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf)
+ throws IOException {
+ vertex.getId().write(output);
+ vertex.getValue().write(output);
+ ((VertexEdges<I, E>) vertex.getEdges()).write(output);
+ output.writeBoolean(vertex.isHalted());
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/6cf79dbe/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
index fb5b685..8a048fd 100644
--- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
+++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java
@@ -370,7 +370,8 @@ public class TestVertexAndEdges {
byte[] byteArray = null;
for (int i = 0; i < REPS; ++i) {
serializeNanosStart = SystemTime.get().getNanoseconds();
- byteArray = WritableUtils.writeToByteArray(vertex);
+ byteArray = WritableUtils.writeVertexToByteArray(
+ vertex, false, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
@@ -381,13 +382,14 @@ public class TestVertexAndEdges {
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
- readVertex = instantiateVertex(edgesClass);
-
+ readVertex = buildVertex(edgesClass);
+
long deserializeNanosStart;
long deserializeNanos = 0;
for (int i = 0; i < REPS; ++i) {
deserializeNanosStart = SystemTime.get().getNanoseconds();
- WritableUtils.readFieldsFromByteArray(byteArray, readVertex);
+ WritableUtils.reinitializeVertexFromByteArray(byteArray, readVertex, false,
+ readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
}
@@ -416,7 +418,7 @@ public class TestVertexAndEdges {
serializeNanosStart = SystemTime.get().getNanoseconds();
outputStream =
new DynamicChannelBufferOutputStream(32);
- vertex.write(outputStream);
+ WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
@@ -429,7 +431,7 @@ public class TestVertexAndEdges {
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
- readVertex = instantiateVertex(edgesClass);
+ readVertex = buildVertex(edgesClass);
long deserializeNanosStart;
long deserializeNanos = 0;
@@ -438,7 +440,8 @@ public class TestVertexAndEdges {
DynamicChannelBufferInputStream inputStream = new
DynamicChannelBufferInputStream(
outputStream.getDynamicChannelBuffer());
- readVertex.readFields(inputStream);
+ WritableUtils.reinitializeVertexFromDataInput(
+ inputStream, readVertex, readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
outputStream.getDynamicChannelBuffer().readerIndex(0);
@@ -470,7 +473,7 @@ public class TestVertexAndEdges {
serializeNanosStart = SystemTime.get().getNanoseconds();
outputStream =
new UnsafeByteArrayOutputStream(32);
- vertex.write(outputStream);
+ WritableUtils.writeVertexToDataOutput(outputStream, vertex, vertex.getConf());
serializeNanos += Times.getNanosecondsSince(SystemTime.get(),
serializeNanosStart);
}
@@ -485,7 +488,7 @@ public class TestVertexAndEdges {
" bytes / sec for " + edgesClass.getName());
Vertex<LongWritable, FloatWritable, DoubleWritable, LongWritable>
- readVertex = instantiateVertex(edgesClass);
+ readVertex = buildVertex(edgesClass);
long deserializeNanosStart;
long deserializeNanos = 0;
@@ -494,7 +497,8 @@ public class TestVertexAndEdges {
UnsafeByteArrayInputStream inputStream = new
UnsafeByteArrayInputStream(
outputStream.getByteArray(), 0, outputStream.getPos());
- readVertex.readFields(inputStream);
+ WritableUtils.reinitializeVertexFromDataInput(
+ inputStream, readVertex, readVertex.getConf());
deserializeNanos += Times.getNanosecondsSince(SystemTime.get(),
deserializeNanosStart);
}
[2/2] git commit: updated refs/heads/trunk to 67e0f11
Posted by cl...@apache.org.
GIRAPH-613
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/67e0f11d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/67e0f11d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/67e0f11d
Branch: refs/heads/trunk
Commit: 67e0f11d2bd98e0614b26d4ea6dd707e6f65a105
Parents: 6cf79db
Author: Claudio Martella <cl...@apache.org>
Authored: Thu Apr 11 19:10:52 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Thu Apr 11 19:10:52 2013 +0200
----------------------------------------------------------------------
.../main/java/org/apache/giraph/graph/Vertex.java | 2 +-
.../giraph/partition/ByteArrayPartition.java | 4 ++++
2 files changed, 5 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/67e0f11d/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
index a1b1a87..ade25fe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -40,7 +40,7 @@ import java.util.Iterator;
/**
* Basic abstract class for writing a BSP application for computation.
- * Giraph will checkpoint Vertex value and edges, hence all user data should
+ * Giraph will store Vertex value and edges, hence all user data should
* be stored as part of the vertex value.
*
* @param <I> Vertex id
http://git-wip-us.apache.org/repos/asf/giraph/blob/67e0f11d/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index d2e7599..b7ec924 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -70,6 +70,10 @@ public class ByteArrayPartition<I extends WritableComparable,
vertexMap = new MapMaker().concurrencyLevel(
getConf().getNettyServerExecutionConcurrency()).makeMap();
representativeVertex = getConf().createVertex();
+ representativeVertex.initialize(
+ getConf().createVertexId(),
+ getConf().createVertexValue(),
+ getConf().createVertexEdges());
useUnsafeSerialization = getConf().useUnsafeSerialization();
}