You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/09/03 20:13:56 UTC
git commit: updated refs/heads/trunk to 96968fd
Updated Branches:
refs/heads/trunk 0959a87e6 -> 96968fdca
GIRAPH-752: Better support for supernodes (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/96968fdc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/96968fdc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/96968fdc
Branch: refs/heads/trunk
Commit: 96968fdcadb1f87017c6e7853c4a63ba065c6611
Parents: 0959a87
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Tue Sep 3 11:11:40 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Tue Sep 3 11:11:40 2013 -0700
----------------------------------------------------------------------
.../ByteArrayMessagesPerVertexStore.java | 81 +++---
.../giraph/comm/messages/MessagesIterable.java | 15 +-
.../PartitionDiskBackedMessageStore.java | 49 ++--
.../out_of_core/SequentialFileMessageStore.java | 11 +-
.../primitives/IntByteArrayMessageStore.java | 70 +++---
.../primitives/LongByteArrayMessageStore.java | 70 +++---
.../org/apache/giraph/conf/GiraphConstants.java | 11 +
.../ImmutableClassesGiraphConfiguration.java | 22 ++
.../apache/giraph/utils/ByteArrayIterable.java | 36 +--
.../apache/giraph/utils/ByteArrayIterator.java | 15 +-
.../giraph/utils/ByteArrayVertexIdMessages.java | 3 +-
.../java/org/apache/giraph/utils/Factory.java | 33 +++
.../utils/RepresentativeByteArrayIterable.java | 22 +-
.../utils/RepresentativeByteArrayIterator.java | 14 +-
.../apache/giraph/utils/io/BigDataInput.java | 196 +++++++++++++++
.../giraph/utils/io/BigDataInputOutput.java | 64 +++++
.../apache/giraph/utils/io/BigDataOutput.java | 250 +++++++++++++++++++
.../apache/giraph/utils/io/DataInputOutput.java | 51 ++++
.../utils/io/ExtendedDataInputOutput.java | 70 ++++++
.../apache/giraph/utils/io/package-info.java | 22 ++
20 files changed, 888 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 6518da6..d9ef449 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -22,10 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.RepresentativeByteArrayIterator;
import org.apache.giraph.utils.VertexIdIterator;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentMap;
* @param <M> Message data
*/
public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
- M extends Writable> extends SimpleMessageStore<I, M, ExtendedDataOutput> {
+ M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
/**
* Constructor
*
@@ -68,30 +68,27 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @param iterator Special iterator that can release ownerhips of vertex ids
* @return Extended data output for this vertex id (created if necessary)
*/
- private ExtendedDataOutput getExtendedDataOutput(
- ConcurrentMap<I, ExtendedDataOutput> partitionMap,
+ private DataInputOutput getDataInputOutput(
+ ConcurrentMap<I, DataInputOutput> partitionMap,
VertexIdIterator<I> iterator) {
- ExtendedDataOutput extendedDataOutput =
+ DataInputOutput dataInputOutput =
partitionMap.get(iterator.getCurrentVertexId());
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- partitionMap.putIfAbsent(
- iterator.releaseCurrentVertexId(),
- newExtendedDataOutput);
- if (extendedDataOutput == null) {
- extendedDataOutput = newExtendedDataOutput;
+ if (dataInputOutput == null) {
+ DataInputOutput newDataOutput = config.createMessagesInputOutput();
+ dataInputOutput = partitionMap.putIfAbsent(
+ iterator.releaseCurrentVertexId(), newDataOutput);
+ if (dataInputOutput == null) {
+ dataInputOutput = newDataOutput;
}
}
- return extendedDataOutput;
+ return dataInputOutput;
}
@Override
public void addPartitionMessages(
int partitionId,
ByteArrayVertexIdMessages<I, M> messages) throws IOException {
- ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+ ConcurrentMap<I, DataInputOutput> partitionMap =
getOrCreatePartitionMap(partitionId);
ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
vertexIdMessageBytesIterator =
@@ -104,12 +101,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
if (vertexIdMessageBytesIterator != null) {
while (vertexIdMessageBytesIterator.hasNext()) {
vertexIdMessageBytesIterator.next();
- ExtendedDataOutput extendedDataOutput =
- getExtendedDataOutput(partitionMap, vertexIdMessageBytesIterator);
+ DataInputOutput dataInputOutput =
+ getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
- synchronized (extendedDataOutput) {
+ synchronized (dataInputOutput) {
vertexIdMessageBytesIterator.writeCurrentMessageBytes(
- extendedDataOutput);
+ dataInputOutput.getDataOutput());
}
}
} else {
@@ -117,12 +114,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
vertexIdMessageIterator = messages.getVertexIdMessageIterator();
while (vertexIdMessageIterator.hasNext()) {
vertexIdMessageIterator.next();
- ExtendedDataOutput extendedDataOutput =
- getExtendedDataOutput(partitionMap, vertexIdMessageIterator);
+ DataInputOutput dataInputOutput =
+ getDataInputOutput(partitionMap, vertexIdMessageIterator);
- synchronized (extendedDataOutput) {
+ synchronized (dataInputOutput) {
vertexIdMessageIterator.getCurrentMessage().write(
- extendedDataOutput);
+ dataInputOutput.getDataOutput());
}
}
}
@@ -130,9 +127,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
protected Iterable<M> getMessagesAsIterable(
- ExtendedDataOutput extendedDataOutput) {
- return new MessagesIterable<M>(config, messageValueFactory,
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ DataInputOutput dataInputOutput) {
+ return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
}
/**
@@ -143,15 +139,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
/**
* Constructor
*
- * @param configuration Configuration
- * @param buf buffer to read from
- * @param off Offset into the buffer to start from
- * @param length Length of the buffer
+ * @param dataInput DataInput containing the messages
*/
- public RepresentativeMessageIterator(
- ImmutableClassesGiraphConfiguration configuration,
- byte[] buf, int off, int length) {
- super(configuration, buf, off, length);
+ public RepresentativeMessageIterator(ExtendedDataInput dataInput) {
+ super(dataInput);
}
@Override
@@ -162,27 +153,27 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
protected int getNumberOfMessagesIn(
- ConcurrentMap<I, ExtendedDataOutput> partitionMap) {
+ ConcurrentMap<I, DataInputOutput> partitionMap) {
int numberOfMessages = 0;
- for (ExtendedDataOutput extendedDataOutput : partitionMap.values()) {
+ for (DataInputOutput dataInputOutput : partitionMap.values()) {
numberOfMessages += Iterators.size(
- new RepresentativeMessageIterator(config,
- extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos()));
+ new RepresentativeMessageIterator(dataInputOutput.createDataInput()));
}
return numberOfMessages;
}
@Override
- protected void writeMessages(ExtendedDataOutput extendedDataOutput,
+ protected void writeMessages(DataInputOutput dataInputOutput,
DataOutput out) throws IOException {
- WritableUtils.writeExtendedDataOutput(extendedDataOutput, out);
+ dataInputOutput.write(out);
}
@Override
- protected ExtendedDataOutput readFieldsForMessages(DataInput in) throws
+ protected DataInputOutput readFieldsForMessages(DataInput in) throws
IOException {
- return WritableUtils.readExtendedDataOutput(in, config);
+ DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+ dataInputOutput.readFields(in);
+ return dataInputOutput;
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
index a466a8d..3b22ab3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -18,8 +18,9 @@
package org.apache.giraph.comm.messages;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.Factory;
import org.apache.giraph.utils.RepresentativeByteArrayIterable;
import org.apache.hadoop.io.Writable;
@@ -36,17 +37,13 @@ public class MessagesIterable<M extends Writable>
/**
* Constructor
*
- * @param conf Configuration
+ * @param dataInputFactory Factory for data inputs
* @param messageValueFactory factory for creating message values
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
*/
public MessagesIterable(
- ImmutableClassesGiraphConfiguration conf,
- MessageValueFactory<M> messageValueFactory,
- byte[] buf, int off, int length) {
- super(conf, buf, off, length);
+ Factory<? extends ExtendedDataInput> dataInputFactory,
+ MessageValueFactory<M> messageValueFactory) {
+ super(dataInputFactory);
this.messageValueFactory = messageValueFactory;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index 7b3e548..7d46d30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -38,8 +38,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.MessagesIterable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -58,7 +57,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
* In-memory message map (must be sorted to insure that the ids are
* ordered)
*/
- private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+ private volatile ConcurrentNavigableMap<I, DataInputOutput>
inMemoryMessages;
/** Hadoop configuration */
private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
@@ -86,7 +85,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
ImmutableClassesGiraphConfiguration<I, ?, ?> config,
MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
fileStoreFactory) {
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+ inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
this.messageValueFactory = messageValueFactory;
this.config = config;
numberOfMessagesInMemory = new AtomicInteger(0);
@@ -112,21 +111,21 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
destinationVertices.add(vertexId);
rwLock.readLock().lock();
try {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- ExtendedDataOutput newExtendedDataOutput =
- config.createExtendedDataOutput();
- extendedDataOutput =
- inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
- if (extendedDataOutput == null) {
+ DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
+ if (dataInputOutput == null) {
+ DataInputOutput newDataInputOutput =
+ config.createMessagesInputOutput();
+ dataInputOutput =
+ inMemoryMessages.putIfAbsent(vertexId, newDataInputOutput);
+ if (dataInputOutput == null) {
ownsVertexId = true;
- extendedDataOutput = newExtendedDataOutput;
+ dataInputOutput = newDataInputOutput;
}
}
- synchronized (extendedDataOutput) {
+ synchronized (dataInputOutput) {
for (M message : messages) {
- message.write(extendedDataOutput);
+ message.write(dataInputOutput.getDataOutput());
numberOfMessagesInMemory.getAndIncrement();
}
}
@@ -144,13 +143,12 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
* @return Iterable of messages for a vertex id
*/
public Iterable<M> getVertexMessages(I vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
+ DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
+ if (dataInputOutput == null) {
+ dataInputOutput = config.createMessagesInputOutput();
}
Iterable<M> combinedIterable = new MessagesIterable<M>(
- config, messageValueFactory,
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ dataInputOutput, messageValueFactory);
for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
combinedIterable = Iterables.concat(combinedIterable,
@@ -217,11 +215,11 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
* @throws IOException
*/
public void flush() throws IOException {
- ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
+ ConcurrentNavigableMap<I, DataInputOutput> messagesToFlush = null;
rwLock.writeLock().lock();
try {
messagesToFlush = inMemoryMessages;
- inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+ inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
numberOfMessagesInMemory.set(0);
} finally {
rwLock.writeLock().unlock();
@@ -248,9 +246,9 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
// write in-memory messages map
out.writeInt(inMemoryMessages.size());
- for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
+ for (Entry<I, DataInputOutput> entry : inMemoryMessages.entrySet()) {
entry.getKey().write(out);
- WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+ entry.getValue().write(out);
}
// write file stores
@@ -278,8 +276,9 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
for (int m = 0; m < mapSize; m++) {
I vertexId = config.createVertexId();
vertexId.readFields(in);
- inMemoryMessages.put(vertexId,
- WritableUtils.readExtendedDataOutput(in, config));
+ DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+ dataInputOutput.readFields(in);
+ inMemoryMessages.put(vertexId, dataInputOutput);
}
// read file stores
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 64031c3..51c05da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -24,7 +24,7 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -109,7 +109,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
* @param messageMap Add the messages from this map to this store
* @throws java.io.IOException
*/
- public void addMessages(NavigableMap<I, ExtendedDataOutput> messageMap)
+ public void addMessages(NavigableMap<I, DataInputOutput> messageMap)
throws IOException {
// Writes messages to its file
if (file.exists()) {
@@ -136,13 +136,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
out.writeInt(destinationVertexIdCount);
// Dump the vertices and their messages in a sorted order
- for (Map.Entry<I, ExtendedDataOutput> entry : messageMap.entrySet()) {
+ for (Map.Entry<I, DataInputOutput> entry : messageMap.entrySet()) {
I destinationVertexId = entry.getKey();
destinationVertexId.write(out);
- ExtendedDataOutput extendedDataOutput = entry.getValue();
+ DataInputOutput dataInputOutput = entry.getValue();
Iterable<M> messages = new MessagesIterable<M>(
- config, messageValueFactory, extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
+ dataInputOutput, messageValueFactory);
int messageCount = Iterables.size(messages);
out.writeInt(messageCount);
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index 597e7af..cdab2e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -26,8 +26,7 @@ import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
@@ -57,7 +56,7 @@ public class IntByteArrayMessageStore<M extends Writable>
protected final MessageValueFactory<M> messageValueFactory;
/** Map from partition id to map from vertex id to message */
private final
- Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>> map;
+ Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>> map;
/** Service worker */
private final CentralizedServiceWorker<IntWritable, ?, ?> service;
/** Giraph configuration */
@@ -79,12 +78,12 @@ public class IntByteArrayMessageStore<M extends Writable>
this.config = config;
map =
- new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>>();
+ new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
for (int partitionId : service.getPartitionStore().getPartitionIds()) {
Partition<IntWritable, ?, ?> partition =
service.getPartitionStore().getPartition(partitionId);
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
- new Int2ObjectOpenHashMap<ExtendedDataOutput>(
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ new Int2ObjectOpenHashMap<DataInputOutput>(
(int) partition.getVertexCount());
map.put(partitionId, partitionMap);
}
@@ -96,34 +95,34 @@ public class IntByteArrayMessageStore<M extends Writable>
* @param vertexId Id of the vertex
* @return Map which holds messages for partition which vertex belongs to.
*/
- private Int2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+ private Int2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
IntWritable vertexId) {
return map.get(service.getPartitionId(vertexId));
}
/**
- * Get the extended data output for a vertex id, creating if necessary.
+ * Get the DataInputOutput for a vertex id, creating if necessary.
*
* @param partitionMap Partition map to look in
* @param vertexId Id of the vertex
- * @return Extended data output for this vertex id (created if necessary)
+ * @return DataInputOutput for this vertex id (created if necessary)
*/
- private ExtendedDataOutput getExtendedDataOutput(
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+ private DataInputOutput getDataInputOutput(
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap,
int vertexId) {
- ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- partitionMap.put(vertexId, extendedDataOutput);
+ DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+ if (dataInputOutput == null) {
+ dataInputOutput = config.createMessagesInputOutput();
+ partitionMap.put(vertexId, dataInputOutput);
}
- return extendedDataOutput;
+ return dataInputOutput;
}
@Override
public void addPartitionMessages(int partitionId,
ByteArrayVertexIdMessages<IntWritable, M> messages) throws
IOException {
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
synchronized (partitionMap) {
ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageBytesIterator
@@ -137,18 +136,19 @@ public class IntByteArrayMessageStore<M extends Writable>
if (vertexIdMessageBytesIterator != null) {
while (vertexIdMessageBytesIterator.hasNext()) {
vertexIdMessageBytesIterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ vertexIdMessageBytesIterator.getCurrentVertexId().get());
vertexIdMessageBytesIterator.writeCurrentMessageBytes(
- getExtendedDataOutput(partitionMap,
- vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+ dataInputOutput.getDataOutput());
}
} else {
ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageIterator
iterator = messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
- iterator.getCurrentMessage().write(
- getExtendedDataOutput(partitionMap,
- iterator.getCurrentVertexId().get()));
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ iterator.getCurrentVertexId().get());
+ iterator.getCurrentMessage().write(dataInputOutput.getDataOutput());
}
}
}
@@ -167,13 +167,12 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public Iterable<M> getVertexMessages(
IntWritable vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput =
+ DataInputOutput dataInputOutput =
getPartitionMap(vertexId).get(vertexId.get());
- if (extendedDataOutput == null) {
+ if (dataInputOutput == null) {
return EmptyIterable.get();
} else {
- return new MessagesIterable<M>(config, messageValueFactory,
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
}
}
@@ -190,7 +189,7 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public Iterable<IntWritable> getPartitionDestinationVertices(
int partitionId) {
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
List<IntWritable> vertices =
Lists.newArrayListWithCapacity(partitionMap.size());
@@ -204,15 +203,15 @@ public class IntByteArrayMessageStore<M extends Writable>
@Override
public void writePartition(DataOutput out,
int partitionId) throws IOException {
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
out.writeInt(partitionMap.size());
- ObjectIterator<Int2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+ ObjectIterator<Int2ObjectMap.Entry<DataInputOutput>> iterator =
partitionMap.int2ObjectEntrySet().fastIterator();
while (iterator.hasNext()) {
- Int2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+ Int2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
out.writeInt(entry.getIntKey());
- WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+ entry.getValue().write(out);
}
}
@@ -220,12 +219,13 @@ public class IntByteArrayMessageStore<M extends Writable>
public void readFieldsForPartition(DataInput in,
int partitionId) throws IOException {
int size = in.readInt();
- Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
- new Int2ObjectOpenHashMap<ExtendedDataOutput>(size);
+ Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ new Int2ObjectOpenHashMap<DataInputOutput>(size);
while (size-- > 0) {
int vertexId = in.readInt();
- partitionMap.put(vertexId,
- WritableUtils.readExtendedDataOutput(in, config));
+ DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+ dataInputOutput.readFields(in);
+ partitionMap.put(vertexId, dataInputOutput);
}
synchronized (map) {
map.put(partitionId, partitionMap);
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 3fe6356..3272ced 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -26,8 +26,7 @@ import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -58,7 +57,7 @@ public class LongByteArrayMessageStore<M extends Writable>
protected final MessageValueFactory<M> messageValueFactory;
/** Map from partition id to map from vertex id to message */
private final
- Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>> map;
+ Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>> map;
/** Service worker */
private final CentralizedServiceWorker<LongWritable, ?, ?> service;
/** Giraph configuration */
@@ -80,12 +79,12 @@ public class LongByteArrayMessageStore<M extends Writable>
this.config = config;
map =
- new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>>();
+ new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
for (int partitionId : service.getPartitionStore().getPartitionIds()) {
Partition<LongWritable, ?, ?> partition =
service.getPartitionStore().getPartition(partitionId);
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
- new Long2ObjectOpenHashMap<ExtendedDataOutput>(
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ new Long2ObjectOpenHashMap<DataInputOutput>(
(int) partition.getVertexCount());
map.put(partitionId, partitionMap);
}
@@ -97,34 +96,34 @@ public class LongByteArrayMessageStore<M extends Writable>
* @param vertexId Id of the vertex
* @return Map which holds messages for partition which vertex belongs to.
*/
- private Long2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+ private Long2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
LongWritable vertexId) {
return map.get(service.getPartitionId(vertexId));
}
/**
- * Get the extended data output for a vertex id, creating if necessary.
+ * Get the DataInputOutput for a vertex id, creating if necessary.
*
* @param partitionMap Partition map to look in
* @param vertexId Id of the vertex
- * @return Extended data output for this vertex id (created if necessary)
+ * @return DataInputOutput for this vertex id (created if necessary)
*/
- private ExtendedDataOutput getExtendedDataOutput(
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+ private DataInputOutput getDataInputOutput(
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap,
long vertexId) {
- ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
- if (extendedDataOutput == null) {
- extendedDataOutput = config.createExtendedDataOutput();
- partitionMap.put(vertexId, extendedDataOutput);
+ DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+ if (dataInputOutput == null) {
+ dataInputOutput = config.createMessagesInputOutput();
+ partitionMap.put(vertexId, dataInputOutput);
}
- return extendedDataOutput;
+ return dataInputOutput;
}
@Override
public void addPartitionMessages(int partitionId,
ByteArrayVertexIdMessages<LongWritable, M> messages) throws
IOException {
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
synchronized (partitionMap) {
ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageBytesIterator
@@ -138,18 +137,19 @@ public class LongByteArrayMessageStore<M extends Writable>
if (vertexIdMessageBytesIterator != null) {
while (vertexIdMessageBytesIterator.hasNext()) {
vertexIdMessageBytesIterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ vertexIdMessageBytesIterator.getCurrentVertexId().get());
vertexIdMessageBytesIterator.writeCurrentMessageBytes(
- getExtendedDataOutput(partitionMap,
- vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+ dataInputOutput.getDataOutput());
}
} else {
ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageIterator
iterator = messages.getVertexIdMessageIterator();
while (iterator.hasNext()) {
iterator.next();
- iterator.getCurrentMessage().write(
- getExtendedDataOutput(partitionMap,
- iterator.getCurrentVertexId().get()));
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ iterator.getCurrentVertexId().get());
+ iterator.getCurrentMessage().write(dataInputOutput.getDataOutput());
}
}
}
@@ -168,13 +168,12 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public Iterable<M> getVertexMessages(
LongWritable vertexId) throws IOException {
- ExtendedDataOutput extendedDataOutput =
+ DataInputOutput dataInputOutput =
getPartitionMap(vertexId).get(vertexId.get());
- if (extendedDataOutput == null) {
+ if (dataInputOutput == null) {
return EmptyIterable.get();
} else {
- return new MessagesIterable<M>(config, messageValueFactory,
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+ return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
}
}
@@ -191,7 +190,7 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public Iterable<LongWritable> getPartitionDestinationVertices(
int partitionId) {
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
List<LongWritable> vertices =
Lists.newArrayListWithCapacity(partitionMap.size());
@@ -205,15 +204,15 @@ public class LongByteArrayMessageStore<M extends Writable>
@Override
public void writePartition(DataOutput out,
int partitionId) throws IOException {
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
map.get(partitionId);
out.writeInt(partitionMap.size());
- ObjectIterator<Long2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+ ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
partitionMap.long2ObjectEntrySet().fastIterator();
while (iterator.hasNext()) {
- Long2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+ Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
out.writeLong(entry.getLongKey());
- WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+ entry.getValue().write(out);
}
}
@@ -221,12 +220,13 @@ public class LongByteArrayMessageStore<M extends Writable>
public void readFieldsForPartition(DataInput in,
int partitionId) throws IOException {
int size = in.readInt();
- Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
- new Long2ObjectOpenHashMap<ExtendedDataOutput>(size);
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ new Long2ObjectOpenHashMap<DataInputOutput>(size);
while (size-- > 0) {
long vertexId = in.readLong();
- partitionMap.put(vertexId,
- WritableUtils.readExtendedDataOutput(in, config));
+ DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+ dataInputOutput.readFields(in);
+ partitionMap.put(vertexId, dataInputOutput);
}
synchronized (map) {
map.put(partitionId, partitionMap);
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 604729a..7c9b19a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -863,6 +863,17 @@ public interface GiraphConstants {
"Use unsafe serialization?");
/**
+ * Use BigDataIO for messages? If there are super-vertices in the
+ * graph which receive a lot of messages (total serialized size of messages
+ * goes beyond the maximum size of a byte array), setting this option to true
+ * will remove that limit. The maximum memory available for a single vertex
+ * will be limited to the maximum heap size available.
+ */
+ BooleanConfOption USE_BIG_DATA_IO_FOR_MESSAGES =
+ new BooleanConfOption("giraph.useBigDataIOForMessages", false,
+ "Use BigDataIO for messages?");
+
+ /**
* Maximum number of attempts a master/worker will retry before killing
* the job. This directly maps to the number of map task attempts in
* Hadoop.
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2506c21..435dfa5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -62,6 +62,9 @@ import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.io.BigDataInputOutput;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.giraph.utils.io.ExtendedDataInputOutput;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -100,6 +103,11 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
* extended data input/output classes
*/
private final boolean useUnsafeSerialization;
+ /**
+ * Use BigDataIO for messages? Cached for fast access to instantiate the
+ * extended data input/output classes for messages
+ */
+ private final boolean useBigDataIOForMessages;
/**
* Constructor. Takes the configuration and then gets the classes out of
@@ -111,6 +119,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
super(conf);
classes = new GiraphClasses<I, V, E>(conf);
useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
+ useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
valueLanguages = PerGraphTypeEnum.readFromConf(
GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
@@ -897,6 +906,19 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Create DataInputOutput to store messages
+ *
+ * @return DataInputOutput object
+ */
+ public DataInputOutput createMessagesInputOutput() {
+ if (useBigDataIOForMessages) {
+ return new BigDataInputOutput(this);
+ } else {
+ return new ExtendedDataInputOutput(this);
+ }
+ }
+
+ /**
* Create an extended data output (can be subclassed)
*
* @return ExtendedDataOutput object
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
index cf2c187..d14172e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
@@ -18,7 +18,6 @@
package org.apache.giraph.utils;
import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
/**
@@ -30,29 +29,17 @@ import org.apache.hadoop.io.Writable;
*/
public abstract class ByteArrayIterable<T extends Writable> implements
Iterable<T> {
- /** Configuration */
- protected final ImmutableClassesGiraphConfiguration configuration;
- /** Data input */
- protected final byte[] buf;
- /** Offset to start in buf */
- protected final int off;
- /** Length of buf */
- protected final int length;
+ /** Factory for data input */
+ protected final Factory<? extends ExtendedDataInput> dataInputFactory;
/**
* Constructor
*
- * @param configuration Configuration
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
+ * @param dataInputFactory Factory for data inputs
*/
- public ByteArrayIterable(ImmutableClassesGiraphConfiguration configuration,
- byte[] buf, int off, int length) {
- this.configuration = configuration;
- this.buf = buf;
- this.off = off;
- this.length = length;
+ public ByteArrayIterable(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ this.dataInputFactory = dataInputFactory;
}
/**
@@ -69,12 +56,11 @@ public abstract class ByteArrayIterable<T extends Writable> implements
/**
* Constructor.
*
- * @param buf Buffer to read from
- * @param off Offset to read from in the buffer
- * @param length Maximum length of the buffer
+ * @param dataInputFactory Factory for data input
*/
- private ByteArrayIterableIterator(byte[] buf, int off, int length) {
- super(configuration, buf, off, length);
+ private ByteArrayIterableIterator(
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ super(dataInputFactory.create());
}
@Override
@@ -85,6 +71,6 @@ public abstract class ByteArrayIterable<T extends Writable> implements
@Override
public Iterator<T> iterator() {
- return new ByteArrayIterableIterator(buf, off, length);
+ return new ByteArrayIterableIterator(dataInputFactory);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
index 76ed789..28b2dc8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
@@ -19,7 +19,6 @@ package org.apache.giraph.utils;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
/**
@@ -35,18 +34,12 @@ public abstract class ByteArrayIterator<T extends Writable> implements
protected final ExtendedDataInput extendedDataInput;
/**
- * Constructor
+ * Wrap ExtendedDataInput in ByteArrayIterator
*
- * @param configuration Configuration
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
+ * @param extendedDataInput ExtendedDataInput
*/
- public ByteArrayIterator(
- ImmutableClassesGiraphConfiguration configuration,
- byte[] buf, int off, int length) {
- extendedDataInput =
- configuration.createExtendedDataInput(buf, off, length);
+ public ByteArrayIterator(ExtendedDataInput extendedDataInput) {
+ this.extendedDataInput = extendedDataInput;
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 56cc01c..7e2b73b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -186,8 +186,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
*
* @param dataOutput Where the current message will be written to
*/
- public void writeCurrentMessageBytes(
- ExtendedDataOutput dataOutput) {
+ public void writeCurrentMessageBytes(DataOutput dataOutput) {
try {
dataOutput.write(getByteArray(), messageOffset, messageBytes);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java b/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
new file mode 100644
index 0000000..c6cf050
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Factory for any kind of objects
+ *
+ * @param <T> Return object type
+ */
+public interface Factory<T> {
+ /**
+ * Create new object
+ *
+ * @return New object
+ */
+ T create();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
index e3992ed..2c24e89 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
@@ -18,7 +18,6 @@
package org.apache.giraph.utils;
import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
/**
@@ -33,15 +32,11 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
/**
* Constructor
*
- * @param configuration Configuration
- * @param buf Buffer
- * @param off Offset to start in the buffer
- * @param length Length of the buffer
+ * @param dataInputFactory Factory for data inputs
*/
public RepresentativeByteArrayIterable(
- ImmutableClassesGiraphConfiguration configuration,
- byte[] buf, int off, int length) {
- super(configuration, buf, off, length);
+ Factory<? extends ExtendedDataInput> dataInputFactory) {
+ super(dataInputFactory);
}
/**
@@ -52,13 +47,11 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
/**
* Constructor.
*
- * @param buf Buffer to read from
- * @param off Offset to read from in the buffer
- * @param length Maximum length of the buffer
+ * @param extendedDataInput ExtendedDataInput
*/
private RepresentativeByteArrayIterableIterator(
- byte[] buf, int off, int length) {
- super(configuration, buf, off, length);
+ ExtendedDataInput extendedDataInput) {
+ super(extendedDataInput);
}
@Override
@@ -69,6 +62,7 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
@Override
public Iterator<T> iterator() {
- return new RepresentativeByteArrayIterableIterator(buf, off, length);
+ return
+ new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
index b6151c5..d36c94f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
@@ -18,7 +18,6 @@
package org.apache.giraph.utils;
import java.io.IOException;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
/**
@@ -33,17 +32,12 @@ public abstract class RepresentativeByteArrayIterator<T extends
private final T representativeWritable = createWritable();
/**
- * Constructor
+ * Wrap ExtendedDataInput in ByteArrayIterator
*
- * @param configuration Configuration
- * @param buf buffer to read from
- * @param off Offset into the buffer to start from
- * @param length Length of the buffer
+ * @param extendedDataInput ExtendedDataInput
*/
- public RepresentativeByteArrayIterator(
- ImmutableClassesGiraphConfiguration configuration,
- byte[] buf, int off, int length) {
- super(configuration, buf, off, length);
+ public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
+ super(extendedDataInput);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
new file mode 100644
index 0000000..f73819a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementations of {@link ExtendedDataInput} are limited because they can
+ * only handle up to 1GB of data. This {@link ExtendedDataInput} overcomes
+ * that limitation, with almost no additional cost when data is not huge.
+ *
+ * Goes in pair with {@link BigDataOutput}
+ */
+public class BigDataInput implements ExtendedDataInput {
+ /** Empty data input */
+ private static final ExtendedDataInput EMPTY_INPUT =
+ new ExtendedByteArrayDataInput(new byte[0]);
+
+ /** Input which we are currently reading from */
+ private ExtendedDataInput currentInput;
+ /** List of all data inputs which contain data */
+ private final List<ExtendedDataInput> dataInputs;
+ /** Which position within dataInputs are we currently reading from */
+ private int currentPositionInInputs;
+
+ /**
+ * Constructor
+ *
+ * @param bigDataOutput {@link BigDataOutput} which we want to read data from
+ */
+ public BigDataInput(BigDataOutput bigDataOutput) {
+ dataInputs = new ArrayList<ExtendedDataInput>(
+ bigDataOutput.getNumberOfDataOutputs());
+ for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
+ dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
+ dataOutput.getByteArray(), 0, dataOutput.getPos()));
+ }
+ currentPositionInInputs = -1;
+ moveToNextDataInput();
+ }
+
+ /** Start reading the following data input */
+ private void moveToNextDataInput() {
+ currentPositionInInputs++;
+ if (currentPositionInInputs < dataInputs.size()) {
+ currentInput = dataInputs.get(currentPositionInInputs);
+ } else {
+ currentInput = EMPTY_INPUT;
+ }
+ }
+
+ /**
+ * Check if we read everything from the current data input, and move to the
+ * next one if needed.
+ */
+ private void checkIfShouldMoveToNextDataInput() {
+ if (currentInput.available() == 0) {
+ moveToNextDataInput();
+ }
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ currentInput.readFully(b);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ currentInput.readFully(b, off, len);
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readBoolean();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readUnsignedShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readLine();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ checkIfShouldMoveToNextDataInput();
+ return currentInput.readUTF();
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ int bytesLeftToSkip = n;
+ while (bytesLeftToSkip >= currentInput.available()) {
+ bytesLeftToSkip -= currentInput.available();
+ moveToNextDataInput();
+ }
+ int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
+ return n - bytesLeftToSkip + bytesSkipped;
+ }
+
+ @Override
+ public int getPos() {
+ int pos = 0;
+ for (int i = 0; i <= currentPositionInInputs; i++) {
+ pos += dataInputs.get(i).getPos();
+ }
+ return pos;
+ }
+
+ @Override
+ public int available() {
+ int available = 0;
+ for (int i = currentPositionInInputs; i < dataInputs.size(); i++) {
+ available += dataInputs.get(i).available();
+ }
+ return available;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
new file mode 100644
index 0000000..7d94f97
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * {@link DataInputOutput} which uses {@link BigDataInput} and {@link
+ * BigDataOutput} to support larger than allowed array size streams.
+ */
+public class BigDataInputOutput extends DataInputOutput {
+ /** DataOutput which we write to */
+ private BigDataOutput dataOutput;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public BigDataInputOutput(ImmutableClassesGiraphConfiguration conf) {
+ dataOutput = new BigDataOutput(conf);
+ }
+
+ @Override
+ public DataOutput getDataOutput() {
+ return dataOutput;
+ }
+
+ @Override
+ public ExtendedDataInput createDataInput() {
+ return new BigDataInput(dataOutput);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ dataOutput.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ dataOutput.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
new file mode 100644
index 0000000..c0fff60
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementations of {@link ExtendedDataOutput} are limited because they can
+ * only handle up to 1GB of data. This {@link DataOutput} overcomes that
+ * limitation, with almost no additional cost when data is not huge.
+ *
+ * Goes in pair with {@link BigDataInput}
+ */
+public class BigDataOutput implements DataOutput, Writable {
+ /** Default initial size of the stream */
+ private static final int DEFAULT_INITIAL_SIZE = 16;
+ /** Max allowed size of the stream */
+ private static final int MAX_SIZE = 1 << 25;
+ /**
+ * Create a new stream when we have less then this number of bytes left in
+ * the stream. Should be larger than the largest serialized primitive.
+ */
+ private static final int SIZE_DELTA = 100;
+
+ /** Data output which we are currently writing to */
+ private ExtendedDataOutput currentDataOutput;
+ /** List of filled outputs, will be null until we get a lot of data */
+ private List<ExtendedDataOutput> dataOutputs;
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
+ this(DEFAULT_INITIAL_SIZE, conf);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param initialSize Initial size of data output
+ * @param conf Configuration
+ */
+ public BigDataOutput(int initialSize,
+ ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
+ dataOutputs = null;
+ currentDataOutput = conf.createExtendedDataOutput(initialSize);
+ }
+
+ /**
+ * Get DataOutput which data should be written to. If current DataOutput is
+ * full it will create a new one.
+ *
+ * @return DataOutput which data should be written to
+ */
+ private ExtendedDataOutput getDataOutputToWriteTo() {
+ if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) {
+ return currentDataOutput;
+ } else {
+ if (dataOutputs == null) {
+ dataOutputs = new ArrayList<ExtendedDataOutput>(1);
+ }
+ dataOutputs.add(currentDataOutput);
+ currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE);
+ return currentDataOutput;
+ }
+ }
+
+ /**
+ * Get number of DataOutputs which contain written data.
+ *
+ * @return Number of DataOutputs which contain written data
+ */
+ public int getNumberOfDataOutputs() {
+ return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
+ }
+
+ /**
+ * Get DataOutputs which contain written data.
+ *
+ * @return DataOutputs which contain written data
+ */
+ public Iterable<ExtendedDataOutput> getDataOutputs() {
+ ArrayList<ExtendedDataOutput> currentList =
+ Lists.newArrayList(currentDataOutput);
+ if (dataOutputs == null) {
+ return currentList;
+ } else {
+ return Iterables.concat(dataOutputs, currentList);
+ }
+ }
+
+ public ImmutableClassesGiraphConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ getDataOutputToWriteTo().write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ getDataOutputToWriteTo().write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ getDataOutputToWriteTo().write(b, off, len);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ getDataOutputToWriteTo().writeBoolean(v);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ getDataOutputToWriteTo().writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ getDataOutputToWriteTo().writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ getDataOutputToWriteTo().writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ getDataOutputToWriteTo().writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ getDataOutputToWriteTo().writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ getDataOutputToWriteTo().writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ getDataOutputToWriteTo().writeDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ getDataOutputToWriteTo().writeBytes(s);
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ getDataOutputToWriteTo().writeChars(s);
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ getDataOutputToWriteTo().writeUTF(s);
+ }
+
+ /**
+ * Write one of data outputs to another data output
+ *
+ * @param dataOutput Data output to write
+ * @param out Data output to write to
+ */
+ private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
+ DataOutput out) throws IOException {
+ out.writeInt(dataOutput.getPos());
+ out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
+ }
+
+ /**
+ * Read data output from data input
+ *
+ * @param in Data input to read from
+ * @return Data output read
+ */
+ private ExtendedDataOutput readExtendedDataOutput(
+ DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] data = new byte[length];
+ in.readFully(data);
+ return conf.createExtendedDataOutput(data, data.length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (dataOutputs == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(dataOutputs.size());
+ for (ExtendedDataOutput stream : dataOutputs) {
+ writeExtendedDataOutput(stream, out);
+ }
+ }
+ writeExtendedDataOutput(currentDataOutput, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ if (size == 0) {
+ dataOutputs = null;
+ } else {
+ dataOutputs = new ArrayList<ExtendedDataOutput>(size);
+ while (size-- > 0) {
+ dataOutputs.add(readExtendedDataOutput(in));
+ }
+ }
+ currentDataOutput = readExtendedDataOutput(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
new file mode 100644
index 0000000..ded78f8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.Factory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataOutput;
+
+/**
+ * Provides both DataOutput which we can write to and DataInputs which are
+ * going to read data which was written to DataOutput.
+ */
+public abstract class DataInputOutput implements
+ Writable, Factory<ExtendedDataInput> {
+ /**
+ * Get DataOutput to write to
+ *
+ * @return DataOutput which we can write to
+ */
+ public abstract DataOutput getDataOutput();
+
+ /**
+ * Create DataInput which reads data from underlying DataOutput
+ *
+ * @return DataInput
+ */
+ public abstract ExtendedDataInput createDataInput();
+
+ @Override
+ public ExtendedDataInput create() {
+ return createDataInput();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
new file mode 100644
index 0000000..af45426
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Wraps {@link ExtendedDataOutput} and {@link ExtendedDataOutput} to be able
+ * to write data and later read data from the same place
+ */
+public class ExtendedDataInputOutput extends DataInputOutput {
+ /** Configuration */
+ private final ImmutableClassesGiraphConfiguration conf;
+ /** DataOutput which we write to */
+ private ExtendedDataOutput dataOutput;
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ */
+ public ExtendedDataInputOutput(ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
+ dataOutput = conf.createExtendedDataOutput();
+ }
+
+ @Override
+ public DataOutput getDataOutput() {
+ return dataOutput;
+ }
+
+ @Override
+ public ExtendedDataInput createDataInput() {
+ return conf.createExtendedDataInput(dataOutput.getByteArray(), 0,
+ dataOutput.getPos());
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeExtendedDataOutput(dataOutput, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ WritableUtils.readExtendedDataOutput(in, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
new file mode 100644
index 0000000..cec6b3c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of utility classes which add the functionality to the ones from
+ * java.io package.
+ */
+package org.apache.giraph.utils.io;