You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/09/03 20:13:56 UTC

git commit: updated refs/heads/trunk to 96968fd

Updated Branches:
  refs/heads/trunk 0959a87e6 -> 96968fdca


GIRAPH-752: Better support for supernodes (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/96968fdc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/96968fdc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/96968fdc

Branch: refs/heads/trunk
Commit: 96968fdcadb1f87017c6e7853c4a63ba065c6611
Parents: 0959a87
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Tue Sep 3 11:11:40 2013 -0700
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Tue Sep 3 11:11:40 2013 -0700

----------------------------------------------------------------------
 .../ByteArrayMessagesPerVertexStore.java        |  81 +++---
 .../giraph/comm/messages/MessagesIterable.java  |  15 +-
 .../PartitionDiskBackedMessageStore.java        |  49 ++--
 .../out_of_core/SequentialFileMessageStore.java |  11 +-
 .../primitives/IntByteArrayMessageStore.java    |  70 +++---
 .../primitives/LongByteArrayMessageStore.java   |  70 +++---
 .../org/apache/giraph/conf/GiraphConstants.java |  11 +
 .../ImmutableClassesGiraphConfiguration.java    |  22 ++
 .../apache/giraph/utils/ByteArrayIterable.java  |  36 +--
 .../apache/giraph/utils/ByteArrayIterator.java  |  15 +-
 .../giraph/utils/ByteArrayVertexIdMessages.java |   3 +-
 .../java/org/apache/giraph/utils/Factory.java   |  33 +++
 .../utils/RepresentativeByteArrayIterable.java  |  22 +-
 .../utils/RepresentativeByteArrayIterator.java  |  14 +-
 .../apache/giraph/utils/io/BigDataInput.java    | 196 +++++++++++++++
 .../giraph/utils/io/BigDataInputOutput.java     |  64 +++++
 .../apache/giraph/utils/io/BigDataOutput.java   | 250 +++++++++++++++++++
 .../apache/giraph/utils/io/DataInputOutput.java |  51 ++++
 .../utils/io/ExtendedDataInputOutput.java       |  70 ++++++
 .../apache/giraph/utils/io/package-info.java    |  22 ++
 20 files changed, 888 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 6518da6..d9ef449 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -22,10 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
 import org.apache.giraph.utils.RepresentativeByteArrayIterator;
 import org.apache.giraph.utils.VertexIdIterator;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentMap;
  * @param <M> Message data
  */
 public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
-    M extends Writable> extends SimpleMessageStore<I, M, ExtendedDataOutput> {
+    M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
   /**
    * Constructor
    *
@@ -68,30 +68,27 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
    * @param iterator Special iterator that can release ownerhips of vertex ids
    * @return Extended data output for this vertex id (created if necessary)
    */
-  private ExtendedDataOutput getExtendedDataOutput(
-      ConcurrentMap<I, ExtendedDataOutput> partitionMap,
+  private DataInputOutput getDataInputOutput(
+      ConcurrentMap<I, DataInputOutput> partitionMap,
       VertexIdIterator<I> iterator) {
-    ExtendedDataOutput extendedDataOutput =
+    DataInputOutput dataInputOutput =
         partitionMap.get(iterator.getCurrentVertexId());
-    if (extendedDataOutput == null) {
-      ExtendedDataOutput newExtendedDataOutput =
-          config.createExtendedDataOutput();
-      extendedDataOutput =
-          partitionMap.putIfAbsent(
-              iterator.releaseCurrentVertexId(),
-              newExtendedDataOutput);
-      if (extendedDataOutput == null) {
-        extendedDataOutput = newExtendedDataOutput;
+    if (dataInputOutput == null) {
+      DataInputOutput newDataOutput = config.createMessagesInputOutput();
+      dataInputOutput = partitionMap.putIfAbsent(
+          iterator.releaseCurrentVertexId(), newDataOutput);
+      if (dataInputOutput == null) {
+        dataInputOutput = newDataOutput;
       }
     }
-    return extendedDataOutput;
+    return dataInputOutput;
   }
 
   @Override
   public void addPartitionMessages(
       int partitionId,
       ByteArrayVertexIdMessages<I, M> messages) throws IOException {
-    ConcurrentMap<I, ExtendedDataOutput> partitionMap =
+    ConcurrentMap<I, DataInputOutput> partitionMap =
         getOrCreatePartitionMap(partitionId);
     ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
         vertexIdMessageBytesIterator =
@@ -104,12 +101,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     if (vertexIdMessageBytesIterator != null) {
       while (vertexIdMessageBytesIterator.hasNext()) {
         vertexIdMessageBytesIterator.next();
-        ExtendedDataOutput extendedDataOutput =
-            getExtendedDataOutput(partitionMap, vertexIdMessageBytesIterator);
+        DataInputOutput dataInputOutput =
+            getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
 
-        synchronized (extendedDataOutput) {
+        synchronized (dataInputOutput) {
           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              extendedDataOutput);
+              dataInputOutput.getDataOutput());
         }
       }
     } else {
@@ -117,12 +114,12 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
           vertexIdMessageIterator = messages.getVertexIdMessageIterator();
       while (vertexIdMessageIterator.hasNext()) {
         vertexIdMessageIterator.next();
-        ExtendedDataOutput extendedDataOutput =
-            getExtendedDataOutput(partitionMap, vertexIdMessageIterator);
+        DataInputOutput dataInputOutput =
+            getDataInputOutput(partitionMap, vertexIdMessageIterator);
 
-        synchronized (extendedDataOutput) {
+        synchronized (dataInputOutput) {
           vertexIdMessageIterator.getCurrentMessage().write(
-              extendedDataOutput);
+              dataInputOutput.getDataOutput());
         }
       }
     }
@@ -130,9 +127,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
   @Override
   protected Iterable<M> getMessagesAsIterable(
-      ExtendedDataOutput extendedDataOutput) {
-    return new MessagesIterable<M>(config, messageValueFactory,
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+      DataInputOutput dataInputOutput) {
+    return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
   }
 
   /**
@@ -143,15 +139,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     /**
      * Constructor
      *
-     * @param configuration Configuration
-     * @param buf buffer to read from
-     * @param off Offset into the buffer to start from
-     * @param length Length of the buffer
+     * @param dataInput DataInput containing the messages
      */
-    public RepresentativeMessageIterator(
-        ImmutableClassesGiraphConfiguration configuration,
-        byte[] buf, int off, int length) {
-      super(configuration, buf, off, length);
+    public RepresentativeMessageIterator(ExtendedDataInput dataInput) {
+      super(dataInput);
     }
 
     @Override
@@ -162,27 +153,27 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 
   @Override
   protected int getNumberOfMessagesIn(
-      ConcurrentMap<I, ExtendedDataOutput> partitionMap) {
+      ConcurrentMap<I, DataInputOutput> partitionMap) {
     int numberOfMessages = 0;
-    for (ExtendedDataOutput extendedDataOutput : partitionMap.values()) {
+    for (DataInputOutput dataInputOutput : partitionMap.values()) {
       numberOfMessages += Iterators.size(
-          new RepresentativeMessageIterator(config,
-              extendedDataOutput.getByteArray(), 0,
-              extendedDataOutput.getPos()));
+          new RepresentativeMessageIterator(dataInputOutput.createDataInput()));
     }
     return numberOfMessages;
   }
 
   @Override
-  protected void writeMessages(ExtendedDataOutput extendedDataOutput,
+  protected void writeMessages(DataInputOutput dataInputOutput,
       DataOutput out) throws IOException {
-    WritableUtils.writeExtendedDataOutput(extendedDataOutput, out);
+    dataInputOutput.write(out);
   }
 
   @Override
-  protected ExtendedDataOutput readFieldsForMessages(DataInput in) throws
+  protected DataInputOutput readFieldsForMessages(DataInput in) throws
       IOException {
-    return WritableUtils.readExtendedDataOutput(in, config);
+    DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+    dataInputOutput.readFields(in);
+    return dataInputOutput;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
index a466a8d..3b22ab3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -18,8 +18,9 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.Factory;
 import org.apache.giraph.utils.RepresentativeByteArrayIterable;
 import org.apache.hadoop.io.Writable;
 
@@ -36,17 +37,13 @@ public class MessagesIterable<M extends Writable>
   /**
    * Constructor
    *
-   * @param conf Configuration
+   * @param dataInputFactory Factory for data inputs
    * @param messageValueFactory factory for creating message values
-   * @param buf Buffer
-   * @param off Offset to start in the buffer
-   * @param length Length of the buffer
    */
   public MessagesIterable(
-      ImmutableClassesGiraphConfiguration conf,
-      MessageValueFactory<M> messageValueFactory,
-      byte[] buf, int off, int length) {
-    super(conf, buf, off, length);
+      Factory<? extends ExtendedDataInput> dataInputFactory,
+      MessageValueFactory<M> messageValueFactory) {
+    super(dataInputFactory);
     this.messageValueFactory = messageValueFactory;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
index 7b3e548..7d46d30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java
@@ -38,8 +38,7 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -58,7 +57,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
    * In-memory message map (must be sorted to insure that the ids are
    * ordered)
    */
-  private volatile ConcurrentNavigableMap<I, ExtendedDataOutput>
+  private volatile ConcurrentNavigableMap<I, DataInputOutput>
   inMemoryMessages;
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
@@ -86,7 +85,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
       ImmutableClassesGiraphConfiguration<I, ?, ?> config,
       MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>>
           fileStoreFactory) {
-    inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+    inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
     this.messageValueFactory = messageValueFactory;
     this.config = config;
     numberOfMessagesInMemory = new AtomicInteger(0);
@@ -112,21 +111,21 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     destinationVertices.add(vertexId);
     rwLock.readLock().lock();
     try {
-      ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-      if (extendedDataOutput == null) {
-        ExtendedDataOutput newExtendedDataOutput =
-            config.createExtendedDataOutput();
-        extendedDataOutput =
-            inMemoryMessages.putIfAbsent(vertexId, newExtendedDataOutput);
-        if (extendedDataOutput == null) {
+      DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
+      if (dataInputOutput == null) {
+        DataInputOutput newDataInputOutput =
+            config.createMessagesInputOutput();
+        dataInputOutput =
+            inMemoryMessages.putIfAbsent(vertexId, newDataInputOutput);
+        if (dataInputOutput == null) {
           ownsVertexId = true;
-          extendedDataOutput = newExtendedDataOutput;
+          dataInputOutput = newDataInputOutput;
         }
       }
 
-      synchronized (extendedDataOutput) {
+      synchronized (dataInputOutput) {
         for (M message : messages) {
-          message.write(extendedDataOutput);
+          message.write(dataInputOutput.getDataOutput());
           numberOfMessagesInMemory.getAndIncrement();
         }
       }
@@ -144,13 +143,12 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
    * @return Iterable of messages for a vertex id
    */
   public Iterable<M> getVertexMessages(I vertexId) throws IOException {
-    ExtendedDataOutput extendedDataOutput = inMemoryMessages.get(vertexId);
-    if (extendedDataOutput == null) {
-      extendedDataOutput = config.createExtendedDataOutput();
+    DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId);
+    if (dataInputOutput == null) {
+      dataInputOutput = config.createMessagesInputOutput();
     }
     Iterable<M> combinedIterable = new MessagesIterable<M>(
-        config, messageValueFactory,
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+        dataInputOutput, messageValueFactory);
 
     for (SequentialFileMessageStore<I, M> fileStore : fileStores) {
       combinedIterable = Iterables.concat(combinedIterable,
@@ -217,11 +215,11 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
    * @throws IOException
    */
   public void flush() throws IOException {
-    ConcurrentNavigableMap<I, ExtendedDataOutput> messagesToFlush = null;
+    ConcurrentNavigableMap<I, DataInputOutput> messagesToFlush = null;
     rwLock.writeLock().lock();
     try {
       messagesToFlush = inMemoryMessages;
-      inMemoryMessages = new ConcurrentSkipListMap<I, ExtendedDataOutput>();
+      inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>();
       numberOfMessagesInMemory.set(0);
     } finally {
       rwLock.writeLock().unlock();
@@ -248,9 +246,9 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
 
     // write in-memory messages map
     out.writeInt(inMemoryMessages.size());
-    for (Entry<I, ExtendedDataOutput> entry : inMemoryMessages.entrySet()) {
+    for (Entry<I, DataInputOutput> entry : inMemoryMessages.entrySet()) {
       entry.getKey().write(out);
-      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+      entry.getValue().write(out);
     }
 
     // write file stores
@@ -278,8 +276,9 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable,
     for (int m = 0; m < mapSize; m++) {
       I vertexId = config.createVertexId();
       vertexId.readFields(in);
-      inMemoryMessages.put(vertexId,
-          WritableUtils.readExtendedDataOutput(in, config));
+      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+      dataInputOutput.readFields(in);
+      inMemoryMessages.put(vertexId, dataInputOutput);
     }
 
     // read file stores

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
index 64031c3..51c05da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java
@@ -24,7 +24,7 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -109,7 +109,7 @@ public class SequentialFileMessageStore<I extends WritableComparable,
    * @param messageMap Add the messages from this map to this store
    * @throws java.io.IOException
    */
-  public void addMessages(NavigableMap<I, ExtendedDataOutput> messageMap)
+  public void addMessages(NavigableMap<I, DataInputOutput> messageMap)
     throws IOException {
     // Writes messages to its file
     if (file.exists()) {
@@ -136,13 +136,12 @@ public class SequentialFileMessageStore<I extends WritableComparable,
       out.writeInt(destinationVertexIdCount);
 
       // Dump the vertices and their messages in a sorted order
-      for (Map.Entry<I, ExtendedDataOutput> entry : messageMap.entrySet()) {
+      for (Map.Entry<I, DataInputOutput> entry : messageMap.entrySet()) {
         I destinationVertexId = entry.getKey();
         destinationVertexId.write(out);
-        ExtendedDataOutput extendedDataOutput = entry.getValue();
+        DataInputOutput dataInputOutput = entry.getValue();
         Iterable<M> messages = new MessagesIterable<M>(
-            config, messageValueFactory, extendedDataOutput.getByteArray(), 0,
-            extendedDataOutput.getPos());
+            dataInputOutput, messageValueFactory);
         int messageCount = Iterables.size(messages);
         out.writeInt(messageCount);
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index 597e7af..cdab2e0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -26,8 +26,7 @@ import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -57,7 +56,7 @@ public class IntByteArrayMessageStore<M extends Writable>
   protected final MessageValueFactory<M> messageValueFactory;
   /** Map from partition id to map from vertex id to message */
   private final
-  Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>> map;
+  Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>> map;
   /** Service worker */
   private final CentralizedServiceWorker<IntWritable, ?, ?> service;
   /** Giraph configuration */
@@ -79,12 +78,12 @@ public class IntByteArrayMessageStore<M extends Writable>
     this.config = config;
 
     map =
-        new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<ExtendedDataOutput>>();
+        new Int2ObjectOpenHashMap<Int2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<IntWritable, ?, ?> partition =
           service.getPartitionStore().getPartition(partitionId);
-      Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
-          new Int2ObjectOpenHashMap<ExtendedDataOutput>(
+      Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
+          new Int2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
     }
@@ -96,34 +95,34 @@ public class IntByteArrayMessageStore<M extends Writable>
    * @param vertexId Id of the vertex
    * @return Map which holds messages for partition which vertex belongs to.
    */
-  private Int2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+  private Int2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
       IntWritable vertexId) {
     return map.get(service.getPartitionId(vertexId));
   }
 
   /**
-   * Get the extended data output for a vertex id, creating if necessary.
+   * Get the DataInputOutput for a vertex id, creating if necessary.
    *
    * @param partitionMap Partition map to look in
    * @param vertexId     Id of the vertex
-   * @return Extended data output for this vertex id (created if necessary)
+   * @return DataInputOutput for this vertex id (created if necessary)
    */
-  private ExtendedDataOutput getExtendedDataOutput(
-      Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+  private DataInputOutput getDataInputOutput(
+      Int2ObjectOpenHashMap<DataInputOutput> partitionMap,
       int vertexId) {
-    ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
-    if (extendedDataOutput == null) {
-      extendedDataOutput = config.createExtendedDataOutput();
-      partitionMap.put(vertexId, extendedDataOutput);
+    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+    if (dataInputOutput == null) {
+      dataInputOutput = config.createMessagesInputOutput();
+      partitionMap.put(vertexId, dataInputOutput);
     }
-    return extendedDataOutput;
+    return dataInputOutput;
   }
 
   @Override
   public void addPartitionMessages(int partitionId,
       ByteArrayVertexIdMessages<IntWritable, M> messages) throws
       IOException {
-    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     synchronized (partitionMap) {
       ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageBytesIterator
@@ -137,18 +136,19 @@ public class IntByteArrayMessageStore<M extends Writable>
       if (vertexIdMessageBytesIterator != null) {
         while (vertexIdMessageBytesIterator.hasNext()) {
           vertexIdMessageBytesIterator.next();
+          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+              vertexIdMessageBytesIterator.getCurrentVertexId().get());
           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              getExtendedDataOutput(partitionMap,
-                  vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+              dataInputOutput.getDataOutput());
         }
       } else {
         ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageIterator
             iterator = messages.getVertexIdMessageIterator();
         while (iterator.hasNext()) {
           iterator.next();
-          iterator.getCurrentMessage().write(
-              getExtendedDataOutput(partitionMap,
-                  iterator.getCurrentVertexId().get()));
+          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
+              iterator.getCurrentVertexId().get());
+          iterator.getCurrentMessage().write(dataInputOutput.getDataOutput());
         }
       }
     }
@@ -167,13 +167,12 @@ public class IntByteArrayMessageStore<M extends Writable>
   @Override
   public Iterable<M> getVertexMessages(
       IntWritable vertexId) throws IOException {
-    ExtendedDataOutput extendedDataOutput =
+    DataInputOutput dataInputOutput =
         getPartitionMap(vertexId).get(vertexId.get());
-    if (extendedDataOutput == null) {
+    if (dataInputOutput == null) {
       return EmptyIterable.get();
     } else {
-      return new MessagesIterable<M>(config, messageValueFactory,
-          extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
     }
   }
 
@@ -190,7 +189,7 @@ public class IntByteArrayMessageStore<M extends Writable>
   @Override
   public Iterable<IntWritable> getPartitionDestinationVertices(
       int partitionId) {
-    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     List<IntWritable> vertices =
         Lists.newArrayListWithCapacity(partitionMap.size());
@@ -204,15 +203,15 @@ public class IntByteArrayMessageStore<M extends Writable>
   @Override
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
-    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     out.writeInt(partitionMap.size());
-    ObjectIterator<Int2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+    ObjectIterator<Int2ObjectMap.Entry<DataInputOutput>> iterator =
         partitionMap.int2ObjectEntrySet().fastIterator();
     while (iterator.hasNext()) {
-      Int2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+      Int2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
       out.writeInt(entry.getIntKey());
-      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+      entry.getValue().write(out);
     }
   }
 
@@ -220,12 +219,13 @@ public class IntByteArrayMessageStore<M extends Writable>
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     int size = in.readInt();
-    Int2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
-        new Int2ObjectOpenHashMap<ExtendedDataOutput>(size);
+    Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
+        new Int2ObjectOpenHashMap<DataInputOutput>(size);
     while (size-- > 0) {
       int vertexId = in.readInt();
-      partitionMap.put(vertexId,
-          WritableUtils.readExtendedDataOutput(in, config));
+      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+      dataInputOutput.readFields(in);
+      partitionMap.put(vertexId, dataInputOutput);
     }
     synchronized (map) {
       map.put(partitionId, partitionMap);

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 3fe6356..3272ced 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -26,8 +26,7 @@ import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -58,7 +57,7 @@ public class LongByteArrayMessageStore<M extends Writable>
   protected final MessageValueFactory<M> messageValueFactory;
   /** Map from partition id to map from vertex id to message */
   private final
-  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>> map;
+  Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>> map;
   /** Service worker */
   private final CentralizedServiceWorker<LongWritable, ?, ?> service;
   /** Giraph configuration */
@@ -80,12 +79,12 @@ public class LongByteArrayMessageStore<M extends Writable>
     this.config = config;
 
     map =
-        new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<ExtendedDataOutput>>();
+        new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
     for (int partitionId : service.getPartitionStore().getPartitionIds()) {
       Partition<LongWritable, ?, ?> partition =
           service.getPartitionStore().getPartition(partitionId);
-      Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
-          new Long2ObjectOpenHashMap<ExtendedDataOutput>(
+      Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+          new Long2ObjectOpenHashMap<DataInputOutput>(
               (int) partition.getVertexCount());
       map.put(partitionId, partitionMap);
     }
@@ -97,34 +96,34 @@ public class LongByteArrayMessageStore<M extends Writable>
    * @param vertexId Id of the vertex
    * @return Map which holds messages for partition which vertex belongs to.
    */
-  private Long2ObjectOpenHashMap<ExtendedDataOutput> getPartitionMap(
+  private Long2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
       LongWritable vertexId) {
     return map.get(service.getPartitionId(vertexId));
   }
 
   /**
-   * Get the extended data output for a vertex id, creating if necessary.
+   * Get the DataInputOutput for a vertex id, creating if necessary.
    *
    * @param partitionMap Partition map to look in
    * @param vertexId Id of the vertex
-   * @return Extended data output for this vertex id (created if necessary)
+   * @return DataInputOutput for this vertex id (created if necessary)
    */
-  private ExtendedDataOutput getExtendedDataOutput(
-      Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap,
+  private DataInputOutput getDataInputOutput(
+      Long2ObjectOpenHashMap<DataInputOutput> partitionMap,
       long vertexId) {
-    ExtendedDataOutput extendedDataOutput = partitionMap.get(vertexId);
-    if (extendedDataOutput == null) {
-      extendedDataOutput = config.createExtendedDataOutput();
-      partitionMap.put(vertexId, extendedDataOutput);
+    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+    if (dataInputOutput == null) {
+      dataInputOutput = config.createMessagesInputOutput();
+      partitionMap.put(vertexId, dataInputOutput);
     }
-    return extendedDataOutput;
+    return dataInputOutput;
   }
 
   @Override
   public void addPartitionMessages(int partitionId,
       ByteArrayVertexIdMessages<LongWritable, M> messages) throws
       IOException {
-    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     synchronized (partitionMap) {
       ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageBytesIterator
@@ -138,18 +137,19 @@ public class LongByteArrayMessageStore<M extends Writable>
       if (vertexIdMessageBytesIterator != null) {
         while (vertexIdMessageBytesIterator.hasNext()) {
           vertexIdMessageBytesIterator.next();
+          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+              vertexIdMessageBytesIterator.getCurrentVertexId().get());
           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
-              getExtendedDataOutput(partitionMap,
-                  vertexIdMessageBytesIterator.getCurrentVertexId().get()));
+              dataInputOutput.getDataOutput());
         }
       } else {
         ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageIterator
             iterator = messages.getVertexIdMessageIterator();
         while (iterator.hasNext()) {
           iterator.next();
-          iterator.getCurrentMessage().write(
-              getExtendedDataOutput(partitionMap,
-                  iterator.getCurrentVertexId().get()));
+          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
+              iterator.getCurrentVertexId().get());
+          iterator.getCurrentMessage().write(dataInputOutput.getDataOutput());
         }
       }
     }
@@ -168,13 +168,12 @@ public class LongByteArrayMessageStore<M extends Writable>
   @Override
   public Iterable<M> getVertexMessages(
       LongWritable vertexId) throws IOException {
-    ExtendedDataOutput extendedDataOutput =
+    DataInputOutput dataInputOutput =
         getPartitionMap(vertexId).get(vertexId.get());
-    if (extendedDataOutput == null) {
+    if (dataInputOutput == null) {
       return EmptyIterable.get();
     } else {
-      return new MessagesIterable<M>(config, messageValueFactory,
-          extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
     }
   }
 
@@ -191,7 +190,7 @@ public class LongByteArrayMessageStore<M extends Writable>
   @Override
   public Iterable<LongWritable> getPartitionDestinationVertices(
       int partitionId) {
-    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     List<LongWritable> vertices =
         Lists.newArrayListWithCapacity(partitionMap.size());
@@ -205,15 +204,15 @@ public class LongByteArrayMessageStore<M extends Writable>
   @Override
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
-    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     out.writeInt(partitionMap.size());
-    ObjectIterator<Long2ObjectMap.Entry<ExtendedDataOutput>> iterator =
+    ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
         partitionMap.long2ObjectEntrySet().fastIterator();
     while (iterator.hasNext()) {
-      Long2ObjectMap.Entry<ExtendedDataOutput> entry = iterator.next();
+      Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
       out.writeLong(entry.getLongKey());
-      WritableUtils.writeExtendedDataOutput(entry.getValue(), out);
+      entry.getValue().write(out);
     }
   }
 
@@ -221,12 +220,13 @@ public class LongByteArrayMessageStore<M extends Writable>
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     int size = in.readInt();
-    Long2ObjectOpenHashMap<ExtendedDataOutput> partitionMap =
-        new Long2ObjectOpenHashMap<ExtendedDataOutput>(size);
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+        new Long2ObjectOpenHashMap<DataInputOutput>(size);
     while (size-- > 0) {
       long vertexId = in.readLong();
-      partitionMap.put(vertexId,
-          WritableUtils.readExtendedDataOutput(in, config));
+      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+      dataInputOutput.readFields(in);
+      partitionMap.put(vertexId, dataInputOutput);
     }
     synchronized (map) {
       map.put(partitionId, partitionMap);

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 604729a..7c9b19a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -863,6 +863,17 @@ public interface GiraphConstants {
           "Use unsafe serialization?");
 
   /**
+   * Use BigDataIO for messages? If there are super-vertices in the
+   * graph which receive a lot of messages (total serialized size of messages
+   * goes beyond the maximum size of a byte array), setting this option to true
+   * will remove that limit. The maximum memory available for a single vertex
+   * will be limited to the maximum heap size available.
+   */
+  BooleanConfOption USE_BIG_DATA_IO_FOR_MESSAGES =
+      new BooleanConfOption("giraph.useBigDataIOForMessages", false,
+          "Use BigDataIO for messages?");
+
+  /**
    * Maximum number of attempts a master/worker will retry before killing
    * the job.  This directly maps to the number of map task attempts in
    * Hadoop.

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 2506c21..435dfa5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -62,6 +62,9 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.io.BigDataInputOutput;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.giraph.utils.io.ExtendedDataInputOutput;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
@@ -100,6 +103,11 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
    * extended data input/output classes
    */
   private final boolean useUnsafeSerialization;
+  /**
+   * Use BigDataIO for messages? Cached for fast access to instantiate the
+   * extended data input/output classes for messages
+   */
+  private final boolean useBigDataIOForMessages;
 
   /**
    * Constructor.  Takes the configuration and then gets the classes out of
@@ -111,6 +119,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     super(conf);
     classes = new GiraphClasses<I, V, E>(conf);
     useUnsafeSerialization = USE_UNSAFE_SERIALIZATION.get(this);
+    useBigDataIOForMessages = USE_BIG_DATA_IO_FOR_MESSAGES.get(this);
     valueLanguages = PerGraphTypeEnum.readFromConf(
         GiraphConstants.GRAPH_TYPE_LANGUAGES, conf);
     valueNeedsWrappers = PerGraphTypeBoolean.readFromConf(
@@ -897,6 +906,19 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create DataInputOutput to store messages
+   *
+   * @return DataInputOutput object
+   */
+  public DataInputOutput createMessagesInputOutput() {
+    if (useBigDataIOForMessages) {
+      return new BigDataInputOutput(this);
+    } else {
+      return new ExtendedDataInputOutput(this);
+    }
+  }
+
+  /**
    * Create an extended data output (can be subclassed)
    *
    * @return ExtendedDataOutput object

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
index cf2c187..d14172e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
@@ -18,7 +18,6 @@
 package org.apache.giraph.utils;
 
 import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -30,29 +29,17 @@ import org.apache.hadoop.io.Writable;
  */
 public abstract class ByteArrayIterable<T extends Writable> implements
     Iterable<T> {
-  /** Configuration */
-  protected final ImmutableClassesGiraphConfiguration configuration;
-  /** Data input */
-  protected final byte[] buf;
-  /** Offset to start in buf */
-  protected final int off;
-  /** Length of buf */
-  protected final int length;
+  /** Factory for data input */
+  protected final Factory<? extends ExtendedDataInput> dataInputFactory;
 
   /**
    * Constructor
    *
-   * @param configuration Configuration
-   * @param buf Buffer
-   * @param off Offset to start in the buffer
-   * @param length Length of the buffer
+   * @param dataInputFactory Factory for data inputs
    */
-  public ByteArrayIterable(ImmutableClassesGiraphConfiguration configuration,
-                           byte[] buf, int off, int length) {
-    this.configuration = configuration;
-    this.buf = buf;
-    this.off = off;
-    this.length = length;
+  public ByteArrayIterable(
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    this.dataInputFactory = dataInputFactory;
   }
 
   /**
@@ -69,12 +56,11 @@ public abstract class ByteArrayIterable<T extends Writable> implements
     /**
      * Constructor.
      *
-     * @param buf Buffer to read from
-     * @param off Offset to read from in the buffer
-     * @param length Maximum length of the buffer
+     * @param dataInputFactory Factory for data input
      */
-    private ByteArrayIterableIterator(byte[] buf, int off, int length) {
-      super(configuration, buf, off, length);
+    private ByteArrayIterableIterator(
+        Factory<? extends ExtendedDataInput> dataInputFactory) {
+      super(dataInputFactory.create());
     }
 
     @Override
@@ -85,6 +71,6 @@ public abstract class ByteArrayIterable<T extends Writable> implements
 
   @Override
   public Iterator<T> iterator() {
-    return new ByteArrayIterableIterator(buf, off, length);
+    return new ByteArrayIterableIterator(dataInputFactory);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
index 76ed789..28b2dc8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
@@ -19,7 +19,6 @@ package org.apache.giraph.utils;
 
 import java.io.IOException;
 import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -35,18 +34,12 @@ public abstract class ByteArrayIterator<T extends Writable> implements
   protected final ExtendedDataInput extendedDataInput;
 
   /**
-   * Constructor
+   * Wrap ExtendedDataInput in ByteArrayIterator
    *
-   * @param configuration Configuration
-   * @param buf Buffer
-   * @param off Offset to start in the buffer
-   * @param length Length of the buffer
+   * @param extendedDataInput ExtendedDataInput
    */
-  public ByteArrayIterator(
-      ImmutableClassesGiraphConfiguration configuration,
-      byte[] buf, int off, int length) {
-    extendedDataInput =
-        configuration.createExtendedDataInput(buf, off, length);
+  public ByteArrayIterator(ExtendedDataInput extendedDataInput) {
+    this.extendedDataInput = extendedDataInput;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 56cc01c..7e2b73b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -186,8 +186,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
      *
      * @param dataOutput Where the current message will be written to
      */
-    public void writeCurrentMessageBytes(
-        ExtendedDataOutput dataOutput) {
+    public void writeCurrentMessageBytes(DataOutput dataOutput) {
       try {
         dataOutput.write(getByteArray(), messageOffset, messageBytes);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java b/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
new file mode 100644
index 0000000..c6cf050
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/Factory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Factory for any kind of objects
+ *
+ * @param <T> Return object type
+ */
+public interface Factory<T> {
+  /**
+   * Create new object
+   *
+   * @return New object
+   */
+  T create();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
index e3992ed..2c24e89 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
@@ -18,7 +18,6 @@
 package org.apache.giraph.utils;
 
 import java.util.Iterator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -33,15 +32,11 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
   /**
    * Constructor
    *
-   * @param configuration Configuration
-   * @param buf Buffer
-   * @param off Offset to start in the buffer
-   * @param length Length of the buffer
+   * @param dataInputFactory Factory for data inputs
    */
   public RepresentativeByteArrayIterable(
-      ImmutableClassesGiraphConfiguration configuration,
-      byte[] buf, int off, int length) {
-    super(configuration, buf, off, length);
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    super(dataInputFactory);
   }
 
   /**
@@ -52,13 +47,11 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
     /**
      * Constructor.
      *
-     * @param buf Buffer to read from
-     * @param off Offset to read from in the buffer
-     * @param length Maximum length of the buffer
+     * @param extendedDataInput ExtendedDataInput
      */
     private RepresentativeByteArrayIterableIterator(
-        byte[] buf, int off, int length) {
-      super(configuration, buf, off, length);
+        ExtendedDataInput extendedDataInput) {
+      super(extendedDataInput);
     }
 
     @Override
@@ -69,6 +62,7 @@ public abstract class RepresentativeByteArrayIterable<T extends Writable>
 
   @Override
   public Iterator<T> iterator() {
-    return new RepresentativeByteArrayIterableIterator(buf, off, length);
+    return
+        new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
index b6151c5..d36c94f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
@@ -18,7 +18,6 @@
 package org.apache.giraph.utils;
 
 import java.io.IOException;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -33,17 +32,12 @@ public abstract class RepresentativeByteArrayIterator<T extends
   private final T representativeWritable = createWritable();
 
   /**
-   * Constructor
+   * Wrap ExtendedDataInput in ByteArrayIterator
    *
-   * @param configuration Configuration
-   * @param buf buffer to read from
-   * @param off Offset into the buffer to start from
-   * @param length Length of the buffer
+   * @param extendedDataInput ExtendedDataInput
    */
-  public RepresentativeByteArrayIterator(
-      ImmutableClassesGiraphConfiguration configuration,
-      byte[] buf, int off, int length) {
-    super(configuration, buf, off, length);
+  public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
+    super(extendedDataInput);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
new file mode 100644
index 0000000..f73819a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInput.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementations of {@link ExtendedDataInput} are limited because they can
+ * only handle up to 1GB of data. This {@link ExtendedDataInput} overcomes
+ * that limitation, with almost no additional cost when data is not huge.
+ *
+ * Goes in pair with {@link BigDataOutput}
+ */
+public class BigDataInput implements ExtendedDataInput {
+  /** Empty data input */
+  private static final ExtendedDataInput EMPTY_INPUT =
+      new ExtendedByteArrayDataInput(new byte[0]);
+
+  /** Input which we are currently reading from */
+  private ExtendedDataInput currentInput;
+  /** List of all data inputs which contain data */
+  private final List<ExtendedDataInput> dataInputs;
+  /** Which position within dataInputs are we currently reading from */
+  private int currentPositionInInputs;
+
+  /**
+   * Constructor
+   *
+   * @param bigDataOutput {@link BigDataOutput} which we want to read data from
+   */
+  public BigDataInput(BigDataOutput bigDataOutput) {
+    dataInputs = new ArrayList<ExtendedDataInput>(
+        bigDataOutput.getNumberOfDataOutputs());
+    for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
+      dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
+          dataOutput.getByteArray(), 0, dataOutput.getPos()));
+    }
+    currentPositionInInputs = -1;
+    moveToNextDataInput();
+  }
+
+  /** Start reading the following data input */
+  private void moveToNextDataInput() {
+    currentPositionInInputs++;
+    if (currentPositionInInputs < dataInputs.size()) {
+      currentInput = dataInputs.get(currentPositionInInputs);
+    } else {
+      currentInput = EMPTY_INPUT;
+    }
+  }
+
+  /**
+   * Check if we read everything from the current data input, and move to the
+   * next one if needed.
+   */
+  private void checkIfShouldMoveToNextDataInput() {
+    if (currentInput.available() == 0) {
+      moveToNextDataInput();
+    }
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    currentInput.readFully(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    currentInput.readFully(b, off, len);
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readBoolean();
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readLine();
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    checkIfShouldMoveToNextDataInput();
+    return currentInput.readUTF();
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    int bytesLeftToSkip = n;
+    while (bytesLeftToSkip >= currentInput.available()) {
+      bytesLeftToSkip -= currentInput.available();
+      moveToNextDataInput();
+    }
+    int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
+    return n - bytesLeftToSkip + bytesSkipped;
+  }
+
+  @Override
+  public int getPos() {
+    int pos = 0;
+    for (int i = 0; i <= currentPositionInInputs; i++) {
+      pos += dataInputs.get(i).getPos();
+    }
+    return pos;
+  }
+
+  @Override
+  public int available() {
+    int available = 0;
+    for (int i = currentPositionInInputs; i < dataInputs.size(); i++) {
+      available += dataInputs.get(i).available();
+    }
+    return available;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
new file mode 100644
index 0000000..7d94f97
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataInputOutput.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * {@link DataInputOutput} which uses {@link BigDataInput} and {@link
+ * BigDataOutput} to support larger than allowed array size streams.
+ */
+public class BigDataInputOutput extends DataInputOutput {
+  /** DataOutput which we write to */
+  private BigDataOutput dataOutput;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public BigDataInputOutput(ImmutableClassesGiraphConfiguration conf) {
+    dataOutput = new BigDataOutput(conf);
+  }
+
+  @Override
+  public DataOutput getDataOutput() {
+    return dataOutput;
+  }
+
+  @Override
+  public ExtendedDataInput createDataInput() {
+    return new BigDataInput(dataOutput);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    dataOutput.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    dataOutput.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
new file mode 100644
index 0000000..c0fff60
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/BigDataOutput.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementations of {@link ExtendedDataOutput} are limited because they can
+ * only handle up to 1GB of data. This {@link DataOutput} overcomes that
+ * limitation, with almost no additional cost when data is not huge.
+ *
+ * Goes in pair with {@link BigDataInput}
+ */
+public class BigDataOutput implements DataOutput, Writable {
+  /** Default initial size of the stream */
+  private static final int DEFAULT_INITIAL_SIZE = 16;
+  /** Max allowed size of the stream */
+  private static final int MAX_SIZE = 1 << 25;
+  /**
+   * Create a new stream when we have less then this number of bytes left in
+   * the stream. Should be larger than the largest serialized primitive.
+   */
+  private static final int SIZE_DELTA = 100;
+
+  /** Data output which we are currently writing to */
+  private ExtendedDataOutput currentDataOutput;
+  /** List of filled outputs, will be null until we get a lot of data */
+  private List<ExtendedDataOutput> dataOutputs;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
+    this(DEFAULT_INITIAL_SIZE, conf);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param initialSize Initial size of data output
+   * @param conf        Configuration
+   */
+  public BigDataOutput(int initialSize,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    dataOutputs = null;
+    currentDataOutput = conf.createExtendedDataOutput(initialSize);
+  }
+
+  /**
+   * Get DataOutput which data should be written to. If current DataOutput is
+   * full it will create a new one.
+   *
+   * @return DataOutput which data should be written to
+   */
+  private ExtendedDataOutput getDataOutputToWriteTo() {
+    if (currentDataOutput.getPos() + SIZE_DELTA < MAX_SIZE) {
+      return currentDataOutput;
+    } else {
+      if (dataOutputs == null) {
+        dataOutputs = new ArrayList<ExtendedDataOutput>(1);
+      }
+      dataOutputs.add(currentDataOutput);
+      currentDataOutput = conf.createExtendedDataOutput(MAX_SIZE);
+      return currentDataOutput;
+    }
+  }
+
+  /**
+   * Get number of DataOutputs which contain written data.
+   *
+   * @return Number of DataOutputs which contain written data
+   */
+  public int getNumberOfDataOutputs() {
+    return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
+  }
+
+  /**
+   * Get DataOutputs which contain written data.
+   *
+   * @return DataOutputs which contain written data
+   */
+  public Iterable<ExtendedDataOutput> getDataOutputs() {
+    ArrayList<ExtendedDataOutput> currentList =
+        Lists.newArrayList(currentDataOutput);
+    if (dataOutputs == null) {
+      return currentList;
+    } else {
+      return Iterables.concat(dataOutputs, currentList);
+    }
+  }
+
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    getDataOutputToWriteTo().write(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    getDataOutputToWriteTo().write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    getDataOutputToWriteTo().write(b, off, len);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    getDataOutputToWriteTo().writeBoolean(v);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    getDataOutputToWriteTo().writeByte(v);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    getDataOutputToWriteTo().writeShort(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    getDataOutputToWriteTo().writeChar(v);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    getDataOutputToWriteTo().writeInt(v);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    getDataOutputToWriteTo().writeLong(v);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    getDataOutputToWriteTo().writeFloat(v);
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    getDataOutputToWriteTo().writeDouble(v);
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    getDataOutputToWriteTo().writeBytes(s);
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    getDataOutputToWriteTo().writeChars(s);
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    getDataOutputToWriteTo().writeUTF(s);
+  }
+
+  /**
+   * Write one of data outputs to another data output
+   *
+   * @param dataOutput Data output to write
+   * @param out        Data output to write to
+   */
+  private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
+      DataOutput out) throws IOException {
+    out.writeInt(dataOutput.getPos());
+    out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
+  }
+
+  /**
+   * Read data output from data input
+   *
+   * @param in Data input to read from
+   * @return Data output read
+   */
+  private ExtendedDataOutput readExtendedDataOutput(
+      DataInput in) throws IOException {
+    int length = in.readInt();
+    byte[] data = new byte[length];
+    in.readFully(data);
+    return conf.createExtendedDataOutput(data, data.length);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (dataOutputs == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(dataOutputs.size());
+      for (ExtendedDataOutput stream : dataOutputs) {
+        writeExtendedDataOutput(stream, out);
+      }
+    }
+    writeExtendedDataOutput(currentDataOutput, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int size = in.readInt();
+    if (size == 0) {
+      dataOutputs = null;
+    } else {
+      dataOutputs = new ArrayList<ExtendedDataOutput>(size);
+      while (size-- > 0) {
+        dataOutputs.add(readExtendedDataOutput(in));
+      }
+    }
+    currentDataOutput = readExtendedDataOutput(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
new file mode 100644
index 0000000..ded78f8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/DataInputOutput.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.Factory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataOutput;
+
+/**
+ * Provides both DataOutput which we can write to and DataInputs which are
+ * going to read data which was written to DataOutput.
+ */
+public abstract class DataInputOutput implements
+    Writable, Factory<ExtendedDataInput> {
+  /**
+   * Get DataOutput to write to
+   *
+   * @return DataOutput which we can write to
+   */
+  public abstract DataOutput getDataOutput();
+
+  /**
+   * Create DataInput which reads data from underlying DataOutput
+   *
+   * @return DataInput
+   */
+  public abstract ExtendedDataInput createDataInput();
+
+  @Override
+  public ExtendedDataInput create() {
+    return createDataInput();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
new file mode 100644
index 0000000..af45426
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils.io;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Wraps {@link ExtendedDataOutput} and {@link ExtendedDataOutput} to be able
+ * to write data and later read data from the same place
+ */
+public class ExtendedDataInputOutput extends DataInputOutput {
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** DataOutput which we write to */
+  private ExtendedDataOutput dataOutput;
+
+  /**
+   * Constructor
+   *
+   * @param conf Configuration
+   */
+  public ExtendedDataInputOutput(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    dataOutput = conf.createExtendedDataOutput();
+  }
+
+  @Override
+  public DataOutput getDataOutput() {
+    return dataOutput;
+  }
+
+  @Override
+  public ExtendedDataInput createDataInput() {
+    return conf.createExtendedDataInput(dataOutput.getByteArray(), 0,
+        dataOutput.getPos());
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeExtendedDataOutput(dataOutput, out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    WritableUtils.readExtendedDataOutput(in, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/96968fdc/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
new file mode 100644
index 0000000..cec6b3c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of utility classes which add the functionality to the ones from
+ * java.io package.
+ */
+package org.apache.giraph.utils.io;