You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 17:25:31 UTC

[2/3] GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 5c56038..f26a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -18,9 +18,6 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.DataInput;
@@ -33,122 +30,8 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <T> Data
  */
-public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
-  implements Writable, ImmutableClassesGiraphConfigurable {
-  /** Extended data output */
-  private ExtendedDataOutput extendedDataOutput;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
-
-  /**
-   * Create a new data object.
-   *
-   * @return Newly-created data object.
-   */
-  public abstract T createData();
-
-  /**
-   * Write a data object to an {@link ExtendedDataOutput}.
-   *
-   * @param out {@link ExtendedDataOutput}
-   * @param data Data object to write
-   * @throws IOException
-   */
-  public abstract void writeData(ExtendedDataOutput out, T data)
-    throws IOException;
-
-  /**
-   * Read a data object's fields from an {@link ExtendedDataInput}.
-   *
-   * @param in {@link ExtendedDataInput}
-   * @param data Data object to fill in-place
-   * @throws IOException
-   */
-  public abstract void readData(ExtendedDataInput in, T data)
-    throws IOException;
-
-  /**
-   * Initialize the inner state. Must be called before {@code add()} is
-   * called.
-   */
-  public void initialize() {
-    extendedDataOutput = configuration.createExtendedDataOutput();
-  }
-
-  /**
-   * Initialize the inner state, with a known size. Must be called before
-   * {@code add()} is called.
-   *
-   * @param expectedSize Number of bytes to be expected
-   */
-  public void initialize(int expectedSize) {
-    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
-  }
-
-  /**
-   * Add a vertex id and data pair to the collection.
-   *
-   * @param vertexId Vertex id
-   * @param data Data
-   */
-  public void add(I vertexId, T data) {
-    try {
-      vertexId.write(extendedDataOutput);
-      writeData(extendedDataOutput, data);
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
-  }
-
-  /**
-   * Add a serialized vertex id and data.
-   *
-   * @param serializedId The bye array which holds the serialized id.
-   * @param idPos The end position of the serialized id in the byte array.
-   * @param data Data
-   */
-  public void add(byte[] serializedId, int idPos, T data) {
-    try {
-      extendedDataOutput.write(serializedId, 0, idPos);
-      writeData(extendedDataOutput, data);
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
-  }
-
-  /**
-   * Get the number of bytes used.
-   *
-   * @return Bytes used
-   */
-  public int getSize() {
-    return extendedDataOutput.getPos();
-  }
-
-  /**
-   * Get the size of this object in serialized form.
-   *
-   * @return The size (in bytes) of the serialized object
-   */
-  public int getSerializedSize() {
-    return 1 + 4 + getSize();
-  }
-
-  /**
-   * Check if the list is empty.
-   *
-   * @return Whether the list is empty
-   */
-  public boolean isEmpty() {
-    return extendedDataOutput.getPos() == 0;
-  }
-
-  /**
-   * Clear the list.
-   */
-  public void clear() {
-    extendedDataOutput.reset();
-  }
+public abstract class ByteArrayVertexIdData<I extends WritableComparable,
+    T> extends AbstractVertexIdData<I, T> {
 
   /**
    * Get the underlying byte-array.
@@ -160,16 +43,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   }
 
   @Override
-  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
-    return configuration;
-  }
-
-  @Override
   public void write(DataOutput dataOutput) throws IOException {
     WritableUtils.writeExtendedDataOutput(extendedDataOutput, dataOutput);
   }
@@ -177,73 +50,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   @Override
   public void readFields(DataInput dataInput) throws IOException {
     extendedDataOutput =
-        WritableUtils.readExtendedDataOutput(dataInput, configuration);
-  }
-
-  /**
-   * Get an iterator over the pairs.
-   *
-   * @return Iterator
-   */
-  public VertexIdDataIterator getVertexIdDataIterator() {
-    return new VertexIdDataIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and data objects so that the
-   * lifetime of the object is only until next() is called.
-   *
-   * Vertex id ownership can be released if desired through
-   * releaseCurrentVertexId().  This optimization allows us to cut down
-   * on the number of objects instantiated and garbage collected.
-   *
-   * Not thread-safe.
-   */
-  public class VertexIdDataIterator extends VertexIdIterator<I> {
-    /** Current data. */
-    private T data;
-
-    /** Default constructor. */
-    public VertexIdDataIterator() {
-      super(extendedDataOutput, configuration);
-    }
-
-    @Override
-    public void next() {
-      if (vertexId == null) {
-        vertexId = configuration.createVertexId();
-      }
-      if (data == null) {
-        data = createData();
-      }
-      try {
-        vertexId.readFields(extendedDataInput);
-        readData(extendedDataInput, data);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: IOException", e);
-      }
-    }
-
-    /**
-     * Get the current data.
-     *
-     * @return Current data
-     */
-    public T getCurrentData() {
-      return data;
-    }
-
-    /**
-     * Release the current data object.
-     *
-     * @return Released data object
-     */
-    public T releaseCurrentData() {
-      T releasedData = data;
-      data = null;
-      return releasedData;
-    }
+        WritableUtils.readExtendedDataOutput(dataInput, getConf());
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 762802b..16370c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -33,7 +33,8 @@ import java.io.IOException;
  */
 @SuppressWarnings("unchecked")
 public class ByteArrayVertexIdEdges<I extends WritableComparable,
-    E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> {
+    E extends Writable> extends ByteArrayVertexIdData<I, Edge<I,
+    E>> implements VertexIdEdges<I, E> {
   /**
    * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
    * to generate edge objects.
@@ -62,37 +63,9 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
     WritableUtils.readEdge(in, edge);
   }
 
-  /**
-   * Get an iterator over the pairs.
-   *
-   * @return Iterator
-   */
-  public VertexIdEdgeIterator getVertexIdEdgeIterator() {
-    return new VertexIdEdgeIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and edge objects so that the
-   * lifetime of the object is only until next() is called.
-   */
-  public class VertexIdEdgeIterator extends VertexIdDataIterator {
-    /**
-     * Get the current edge.
-     *
-     * @return Current edge
-     */
-    public Edge<I, E> getCurrentEdge() {
-      return getCurrentData();
-    }
-
-    /**
-     * Release the current edge.
-     *
-     * @return Released edge
-     */
-    public Edge<I, E> releaseCurrentEdge() {
-      return releaseCurrentData();
-    }
+  @Override
+  public ByteStructVertexIdEdgeIterator<I, E> getVertexIdEdgeIterator() {
+    return new ByteStructVertexIdEdgeIterator<>(this);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0ac8fdf..b3eca3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.giraph.utils;
 
 import org.apache.giraph.factories.MessageValueFactory;
@@ -33,7 +34,8 @@ import java.io.IOException;
  */
 @SuppressWarnings("unchecked")
 public class ByteArrayVertexIdMessages<I extends WritableComparable,
-    M extends Writable> extends ByteArrayVertexIdData<I, M> {
+  M extends Writable> extends ByteArrayVertexIdData<I, M>
+  implements VertexIdMessages<I, M> {
   /** Message value class */
   private MessageValueFactory<M> messageValueFactory;
   /** Add the message size to the stream? (Depends on the message store) */
@@ -52,7 +54,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
   /**
    * Set whether message sizes should be encoded.  This should only be a
    * possibility when not combining.  When combining, all messages need to be
-   * deserializd right away, so this won't help.
+   * de-serialized right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
     if (!getConf().useMessageCombiner()) {
@@ -89,45 +91,31 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
     setUseMessageSizeEncoding();
   }
 
-  /**
-   * Get specialized iterator that will instiantiate the vertex id and
-   * message of this object.
-   *
-   * @return Special iterator that reuses vertex ids and messages unless
-   *         specified
-   */
-  public VertexIdMessageIterator getVertexIdMessageIterator() {
-    return new VertexIdMessageIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and message objects so that the
-   * lifetime of the object is only until next() is called.
-   */
-  public class VertexIdMessageIterator extends VertexIdDataIterator {
-    /**
-     * Get the current message.
-     *
-     * @return Current message
-     */
-    public M getCurrentMessage() {
-      return getCurrentData();
-    }
+  @Override
+  public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+    return new ByteStructVertexIdMessageIterator<>(this);
   }
 
-  /**
-   * Get specialized iterator that will instiantiate the vertex id and
-   * message of this object.  It will only produce message bytes, not actual
-   * messages and expects a different encoding.
-   *
-   * @return Special iterator that reuses vertex ids (unless released) and
-   *         copies message bytes
-   */
-  public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+  @Override
+  public ByteStructVertexIdMessageBytesIterator<I, M>
+  getVertexIdMessageBytesIterator() {
     if (!useMessageSizeEncoding) {
       return null;
     }
-    return new VertexIdMessageBytesIterator();
+    return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
+      @Override
+      public void writeCurrentMessageBytes(DataOutput dataOutput) {
+        try {
+          dataOutput.write(extendedDataOutput.getByteArray(),
+            messageOffset, messageBytes);
+        } catch (NegativeArraySizeException e) {
+          VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
+        } catch (IOException e) {
+          throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+              "IOException", e);
+        }
+      }
+    };
   }
 
   @Override
@@ -141,66 +129,4 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
     useMessageSizeEncoding = dataInput.readBoolean();
     super.readFields(dataInput);
   }
-
-  /**
-   * Special iterator that reuses vertex ids and messages bytes so that the
-   * lifetime of the object is only until next() is called.
-   *
-   * Vertex id ownership can be released if desired through
-   * releaseCurrentVertexId().  This optimization allows us to cut down
-   * on the number of objects instantiated and garbage collected.  Messages
-   * can only be copied to an ExtendedDataOutput object
-   *
-   * Not thread-safe.
-   */
-  public class VertexIdMessageBytesIterator extends VertexIdDataIterator {
-    /** Last message offset */
-    private int messageOffset = -1;
-    /** Number of bytes in the last message */
-    private int messageBytes = -1;
-
-    /**
-     * Moves to the next element in the iteration.
-     */
-    @Override
-    public void next() {
-      if (vertexId == null) {
-        vertexId = getConf().createVertexId();
-      }
-
-      try {
-        vertexId.readFields(extendedDataInput);
-        messageBytes = extendedDataInput.readInt();
-        messageOffset = extendedDataInput.getPos();
-        if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
-          throw new IllegalStateException("next: Failed to skip " +
-              messageBytes);
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException("next: IOException", e);
-      }
-    }
-
-    /**
-     * Write the current message to an ExtendedDataOutput object
-     *
-     * @param dataOutput Where the current message will be written to
-     */
-    public void writeCurrentMessageBytes(DataOutput dataOutput) {
-      try {
-        dataOutput.write(getByteArray(), messageOffset, messageBytes);
-      } catch (NegativeArraySizeException e) {
-        throw new RuntimeException("The numbers of bytes sent to vertex " +
-            vertexId + " exceeded the max capacity of " +
-            "its ExtendedDataOutput. Please consider setting " +
-            "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
-            " in the graph which receive a lot of messages (total serialized " +
-            "size of messages goes beyond the maximum size of a byte array), " +
-            "setting this option to true will remove that limit");
-      } catch (IOException e) {
-        throw new IllegalStateException("writeCurrentMessageBytes: Got " +
-            "IOException", e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
new file mode 100644
index 0000000..4e15c1b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterable is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterable<T extends Writable> implements
+    Iterable<T> {
+  /** Factory for data input */
+  protected final Factory<? extends ExtendedDataInput> dataInputFactory;
+
+  /**
+   * Constructor
+   *
+   * @param dataInputFactory Factory for data inputs
+   */
+  public ByteStructIterable(
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    this.dataInputFactory = dataInputFactory;
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+
+  @Override
+  public Iterator<T> iterator() {
+    return new ByteStructIterator<T>(dataInputFactory.create()) {
+      @Override
+      protected T createWritable() {
+        return ByteStructIterable.this.createWritable();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
new file mode 100644
index 0000000..322365c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterator is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterator<T extends Writable> implements
+    Iterator<T> {
+  /** Data input */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /**
+   * Wrap ExtendedDataInput in ByteArrayIterator
+   *
+   * @param extendedDataInput ExtendedDataInput
+   */
+  public ByteStructIterator(ExtendedDataInput extendedDataInput) {
+    this.extendedDataInput = extendedDataInput;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  @Override
+  public T next() {
+    T writable = createWritable();
+    try {
+      writable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return writable;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("remove: Not supported");
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
new file mode 100644
index 0000000..cefec0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@NotThreadSafe
+public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
+  extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
+  /** VertexIdData to iterate over */
+  protected AbstractVertexIdData<I, T> vertexIdData;
+  /** Current data. */
+  private T data;
+
+  /**
+   * Constructor
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdDataIterator(
+    AbstractVertexIdData<I, T> vertexIdData) {
+    super(vertexIdData.extendedDataOutput, vertexIdData.getConf());
+    this.vertexIdData = vertexIdData;
+  }
+
+  @Override
+  public void next() {
+    if (vertexId == null) {
+      vertexId = vertexIdData.getConf().createVertexId();
+    }
+    if (data == null) {
+      data = vertexIdData.createData();
+    }
+    try {
+      vertexId.readFields(extendedDataInput);
+      vertexIdData.readData(extendedDataInput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+
+  @Override
+  public T getCurrentData() {
+    return data;
+  }
+
+  @Override
+  public T releaseCurrentData() {
+    T releasedData = data;
+    data = null;
+    return releasedData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
new file mode 100644
index 0000000..7e06038
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public class ByteStructVertexIdEdgeIterator<I extends WritableComparable,
+  E extends Writable> extends ByteStructVertexIdDataIterator<I, Edge<I, E>>
+  implements VertexIdEdgeIterator<I, E> {
+
+  /**
+   * Constructor
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdEdgeIterator(
+      AbstractVertexIdData<I, Edge<I, E>> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() {
+    return getCurrentData();
+  }
+
+
+  @Override
+  public Edge<I, E> releaseCurrentEdge() {
+    return releaseCurrentData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
new file mode 100644
index 0000000..3d564cd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator
+ * and VertexIdMessageBytesIterator.
+ *
+ * @param <I> Vertex id
+ */
+public abstract class ByteStructVertexIdIterator<I extends WritableComparable>
+  implements VertexIdIterator<I> {
+  /** Reader of the serialized edges */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /** Current vertex id */
+  protected I vertexId;
+
+  /**
+   * Constructor.
+   *
+   * @param extendedDataOutput Extended data output
+   * @param conf Configuration
+   */
+  public ByteStructVertexIdIterator(
+    ExtendedDataOutput extendedDataOutput,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+    if (extendedDataOutput != null && conf != null) {
+      extendedDataInput = conf.createExtendedDataInput(extendedDataOutput);
+    } else {
+      throw new IllegalStateException("Cannot instantiate vertexIdIterator " +
+        "with null arguments");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  @Override
+  public I getCurrentVertexId() {
+    return vertexId;
+  }
+
+  @Override
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertexId;
+    vertexId = null;
+    return releasedVertexId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..c0a86c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.  Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+@NotThreadSafe
+public abstract class ByteStructVertexIdMessageBytesIterator<I
+  extends WritableComparable, M extends Writable>
+  extends ByteStructVertexIdDataIterator<I, M>
+  implements VertexIdMessageBytesIterator<I, M> {
+  /** Last message offset */
+  protected int messageOffset = -1;
+  /** Number of bytes in the last message */
+  protected int messageBytes = -1;
+
+  /**
+   * Constructor with vertexIdData
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdMessageBytesIterator(
+    AbstractVertexIdData<I, M> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  /**
+   * Moves to the next element in the iteration.
+   */
+  @Override
+  public void next() {
+    if (vertexId == null) {
+      vertexId = vertexIdData.getConf().createVertexId();
+    }
+
+    try {
+      vertexId.readFields(extendedDataInput);
+      messageBytes = extendedDataInput.readInt();
+      messageOffset = extendedDataInput.getPos();
+      if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+        throw new IllegalStateException("next: Failed to skip " +
+            messageBytes);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
new file mode 100644
index 0000000..b686211
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
+  M extends Writable> extends ByteStructVertexIdDataIterator<I, M>
+  implements VertexIdMessageIterator<I, M> {
+
+  /**
+   * Constructor with vertexIdData
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdMessageIterator(
+    AbstractVertexIdData<I, M> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  @Override
+  public M getCurrentMessage() {
+    return getCurrentData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
new file mode 100644
index 0000000..52d959e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+  /** Bytes used in a boolean */
+  public static final int SIZE_OF_BOOLEAN = 1;
+  /** Bytes used in a byte */
+  public static final int SIZE_OF_BYTE = 1;
+  /** Bytes used in a char */
+  public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+  /** Bytes used in a short */
+  public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+  /** Bytes used in an int */
+  public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+  /** Bytes used in a long */
+  public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+  /** Bytes used in a float */
+  public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+  /** Bytes used in a double */
+  public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+
+  /**
+   * Private Constructor
+   */
+  private ByteUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
index 0ecea77..3eae25b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -26,7 +26,7 @@ import java.io.IOException;
  * Provides access to a internals of ByteArrayInputStream
  */
 public class ExtendedByteArrayDataInput extends ByteArrayInputStream
-    implements ExtendedDataInput {
+  implements ExtendedDataInput {
   /** Internal data input */
   private final DataInput dataInput;
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 0ff366d..9988b15 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -140,10 +140,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
   }
 
   @Override
-  public void skipBytes(int bytesToSkip) {
-    if ((count + bytesToSkip) > buf.length) {
-      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+  public void ensureWritable(int minSize) {
+    if ((count + minSize) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + minSize));
     }
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    ensureWritable(bytesToSkip);
     count += bytesToSkip;
   }
 
@@ -161,6 +166,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
   }
 
   @Override
+  public byte[] toByteArray(int offset, int length) {
+    if (offset + length > count) {
+      throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+          "Length: %d exceeds the size of buf : %d", offset, length, count));
+    }
+    return Arrays.copyOfRange(buf, offset, length);
+  }
+
+  @Override
   public byte[] getByteArray() {
     return buf;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index 54ef514..bc979af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -24,6 +24,14 @@ import java.io.DataOutput;
  */
 public interface ExtendedDataOutput extends DataOutput {
   /**
+   * Ensure that backing byte structure has at least minSize
+   * additional bytes
+   *
+   * @param minSize additional size required
+   */
+  void ensureWritable(int minSize);
+
+  /**
    * Skip some number of bytes.
    *
    * @param  bytesToSkip Number of bytes to skip
@@ -61,7 +69,17 @@ public interface ExtendedDataOutput extends DataOutput {
   byte[] toByteArray();
 
   /**
+   * Return a copy of slice of byte array
+   *
+   * @param offset offset of array
+   * @param length length of slice
+   * @return byte array
+   */
+  byte[] toByteArray(int offset, int length);
+
+  /**
    * Clears the buffer
    */
   void reset();
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
deleted file mode 100644
index 2c24e89..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by the iterators generated from this object have
- * lifetimes only until next() is called.  In that sense, the object
- * provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterable<T extends Writable>
-    extends ByteArrayIterable<T> {
-  /**
-   * Constructor
-   *
-   * @param dataInputFactory Factory for data inputs
-   */
-  public RepresentativeByteArrayIterable(
-      Factory<? extends ExtendedDataInput> dataInputFactory) {
-    super(dataInputFactory);
-  }
-
-  /**
-   * Iterator over the internal byte array
-   */
-  private class RepresentativeByteArrayIterableIterator extends
-      RepresentativeByteArrayIterator<T> {
-    /**
-     * Constructor.
-     *
-     * @param extendedDataInput ExtendedDataInput
-     */
-    private RepresentativeByteArrayIterableIterator(
-        ExtendedDataInput extendedDataInput) {
-      super(extendedDataInput);
-    }
-
-    @Override
-    protected T createWritable() {
-      return RepresentativeByteArrayIterable.this.createWritable();
-    }
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-    return
-        new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
deleted file mode 100644
index d36c94f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by this iterator have lifetimes only until next() is
- * called.  In that sense, the object provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterator<T extends
-    Writable> extends ByteArrayIterator<T> {
-  /** Representative writable */
-  private final T representativeWritable = createWritable();
-
-  /**
-   * Wrap ExtendedDataInput in ByteArrayIterator
-   *
-   * @param extendedDataInput ExtendedDataInput
-   */
-  public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
-    super(extendedDataInput);
-  }
-
-  @Override
-  public T next() {
-    try {
-      representativeWritable.readFields(extendedDataInput);
-    } catch (IOException e) {
-      throw new IllegalStateException("next: readFields got IOException", e);
-    }
-    return representativeWritable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
new file mode 100644
index 0000000..0859010
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by the iterators generated from this object have
+ * lifetimes only until next() is called.  In that sense, the object
+ * provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterable<T extends Writable>
+    extends ByteStructIterable<T> {
+  /**
+   * Constructor
+   *
+   * @param dataInputFactory Factory for data inputs
+   */
+  public RepresentativeByteStructIterable(
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    super(dataInputFactory);
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new RepresentativeByteStructIterator<T>(dataInputFactory.create()) {
+      @Override
+      protected T createWritable() {
+        return RepresentativeByteStructIterable.this.createWritable();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
new file mode 100644
index 0000000..0bf98ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by this iterator have lifetimes only until next() is
+ * called.  In that sense, the object provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterator<T extends
+    Writable> extends ByteStructIterator<T> {
+  /** Representative writable */
+  private final T representativeWritable = createWritable();
+
+  /**
+   * Wrap ExtendedDataInput in ByteArrayIterator
+   *
+   * @param extendedDataInput ExtendedDataInput
+   */
+  public RepresentativeByteStructIterator(ExtendedDataInput extendedDataInput) {
+    super(extendedDataInput);
+  }
+
+  @Override
+  public T next() {
+    try {
+      representativeWritable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return representativeWritable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
new file mode 100644
index 0000000..2c56cb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+import java.io.IOException;
+import org.apache.log4j.Logger;
+
+/**
+ * RequestUtils utility class
+ */
+public class RequestUtils {
+  /** Logger */
+  public static final Logger LOG = Logger.getLogger(RequestUtils.class);
+
+  /**
+   * Private Constructor
+   */
+  private RequestUtils() {
+  }
+
+  /**
+   * decodeWritableRequest based on predicate
+   *
+   * @param buf ByteBuf
+   * @param request writableRequest
+   * @return properly initialized writableRequest
+   * @throws IOException
+   */
+  public static WritableRequest decodeWritableRequest(ByteBuf buf,
+    WritableRequest request) throws IOException {
+    ByteBufInputStream input = new ByteBufInputStream(buf);
+    request.readFields(input);
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
new file mode 100644
index 0000000..db19fda
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public class UnsafeArrayReads extends UnsafeReads {
+  /** Access to the unsafe class */
+  private static final sun.misc.Unsafe UNSAFE;
+  static {
+    try {
+      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      field.setAccessible(true);
+      UNSAFE = (sun.misc.Unsafe) field.get(null);
+      // Checkstyle exception due to needing to check if unsafe is allowed
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      throw new RuntimeException("UnsafeArrayReads: Failed to " +
+          "get unsafe", e);
+    }
+  }
+  /** Offset of a byte array */
+  private static final long BYTE_ARRAY_OFFSET  =
+      UNSAFE.arrayBaseOffset(byte[].class);
+
+  /** Byte buffer */
+  private final byte[] buf;
+
+  /**
+   * Constructor
+   *
+   * @param buf Buffer to read from
+   */
+  public UnsafeArrayReads(byte[] buf) {
+    super(buf.length);
+    this.buf = buf;
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param buf Buffer to read from
+   * @param offset Offsetin the buffer to start reading from
+   * @param length Max length of the buffer to read
+   */
+  public UnsafeArrayReads(byte[] buf, int offset, int length) {
+    super(offset, length);
+    this.buf = buf;
+  }
+
+  @Override
+  public int available() {
+    return (int) (bufLength - pos);
+  }
+
+
+  @Override
+  public int getPos() {
+    return (int) pos;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    ensureRemaining(b.length);
+    System.arraycopy(buf, (int) pos, b, 0, b.length);
+    pos += b.length;
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    ensureRemaining(len);
+    System.arraycopy(buf, (int) pos, b, off, len);
+    pos += len;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    ensureRemaining(SIZE_OF_BOOLEAN);
+    boolean value = UNSAFE.getBoolean(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_BOOLEAN;
+    return value;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureRemaining(SIZE_OF_BYTE);
+    byte value = UNSAFE.getByte(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_BYTE;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return (short) (readByte() & 0xFF);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    ensureRemaining(SIZE_OF_SHORT);
+    short value = UNSAFE.getShort(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_SHORT;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return readShort() & 0xFFFF;
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    ensureRemaining(SIZE_OF_CHAR);
+    char value = UNSAFE.getChar(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_CHAR;
+    return value;
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    ensureRemaining(SIZE_OF_INT);
+    int value = UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_INT;
+    return value;
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    ensureRemaining(SIZE_OF_LONG);
+    long value = UNSAFE.getLong(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_LONG;
+    return value;
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    ensureRemaining(SIZE_OF_FLOAT);
+    float value = UNSAFE.getFloat(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_FLOAT;
+    return value;
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    ensureRemaining(SIZE_OF_DOUBLE);
+    double value = UNSAFE.getDouble(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_DOUBLE;
+    return value;
+  }
+
+  /**
+   * Get an int at an arbitrary position in a byte[]
+   *
+   * @param buf Buffer to get the int from
+   * @param pos Position in the buffer to get the int from
+   * @return Int at the buffer position
+   */
+  public static int getInt(byte[] buf, int pos) {
+    return UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
index 20ed92b..c8a8cac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
@@ -15,52 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.giraph.utils;
 
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.lang.reflect.Field;
-import org.apache.log4j.Logger;
+package org.apache.giraph.utils;
 
 /**
- * Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * UnsafeByteArrayInputStream
  */
-public class UnsafeByteArrayInputStream implements ExtendedDataInput {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(
-      UnsafeByteArrayInputStream.class);
-  /** Access to the unsafe class */
-  private static final sun.misc.Unsafe UNSAFE;
-  static {
-    try {
-      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
-      field.setAccessible(true);
-      UNSAFE = (sun.misc.Unsafe) field.get(null);
-      // Checkstyle exception due to needing to check if unsafe is allowed
-      // CHECKSTYLE: stop IllegalCatch
-    } catch (Exception e) {
-      // CHECKSTYLE: resume IllegalCatch
-      throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
-          "get unsafe", e);
-    }
-  }
-  /** Offset of a byte array */
-  private static final long BYTE_ARRAY_OFFSET  =
-      UNSAFE.arrayBaseOffset(byte[].class);
-  /** Offset of a long array */
-  private static final long LONG_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(long[].class);
-  /** Offset of a double array */
-  private static final long DOUBLE_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(double[].class);
-
-  /** Byte buffer */
-  private final byte[] buf;
-  /** Buffer length */
-  private final int bufLength;
-  /** Position in the buffer */
-  private int pos = 0;
+public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
 
   /**
    * Constructor
@@ -68,8 +29,7 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
    * @param buf Buffer to read from
    */
   public UnsafeByteArrayInputStream(byte[] buf) {
-    this.buf = buf;
-    this.bufLength = buf.length;
+    super(buf);
   }
 
   /**
@@ -80,275 +40,6 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
    * @param length Max length of the buffer to read
    */
   public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
-    this.buf = buf;
-    this.pos = offset;
-    this.bufLength = length;
-  }
-
-  /**
-   * How many bytes are still available?
-   *
-   * @return Number of bytes available
-   */
-  public int available() {
-    return bufLength - pos;
-  }
-
-  /**
-   * What position in the stream?
-   *
-   * @return Position
-   */
-  public int getPos() {
-    return pos;
-  }
-
-  /**
-   * Check whether there are enough remaining bytes for an operation
-   *
-   * @param requiredBytes Bytes required to read
-   * @throws IOException When there are not enough bytes to read
-   */
-  private void ensureRemaining(int requiredBytes) throws IOException {
-    if (bufLength - pos < requiredBytes) {
-      throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
-          " bytes remaining, trying to read " + requiredBytes);
-    }
-  }
-
-  @Override
-  public void readFully(byte[] b) throws IOException {
-    ensureRemaining(b.length);
-    System.arraycopy(buf, pos, b, 0, b.length);
-    pos += b.length;
-  }
-
-  @Override
-  public void readFully(byte[] b, int off, int len) throws IOException {
-    ensureRemaining(len);
-    System.arraycopy(buf, pos, b, off, len);
-    pos += len;
-  }
-
-  @Override
-  public int skipBytes(int n) throws IOException {
-    ensureRemaining(n);
-    pos += n;
-    return n;
-  }
-
-  @Override
-  public boolean readBoolean() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
-    boolean value = UNSAFE.getBoolean(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
-    return value;
-  }
-
-  @Override
-  public byte readByte() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
-    byte value = UNSAFE.getByte(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
-    return value;
-  }
-
-  @Override
-  public int readUnsignedByte() throws IOException {
-    return (short) (readByte() & 0xFF);
-  }
-
-  @Override
-  public short readShort() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
-    short value = UNSAFE.getShort(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
-    return value;
-  }
-
-  @Override
-  public int readUnsignedShort() throws IOException {
-    return readShort() & 0xFFFF;
-  }
-
-  @Override
-  public char readChar() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
-    char value = UNSAFE.getChar(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
-    return value;
-  }
-
-  @Override
-  public int readInt() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
-    int value = UNSAFE.getInt(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
-    return value;
-  }
-
-  @Override
-  public long readLong() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
-    long value = UNSAFE.getLong(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
-    return value;
-  }
-
-  @Override
-  public float readFloat() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
-    float value = UNSAFE.getFloat(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
-    return value;
-  }
-
-  @Override
-  public double readDouble() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
-    double value = UNSAFE.getDouble(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
-    return value;
-  }
-
-  @Override
-  public String readLine() throws IOException {
-    // Note that this code is mostly copied from DataInputStream
-    char[] tmpBuf = new char[128];
-
-    int room = tmpBuf.length;
-    int offset = 0;
-    int c;
-
-  loop:
-    while (true) {
-      c = readByte();
-      switch (c) {
-      case -1:
-      case '\n':
-        break loop;
-      case '\r':
-        int c2 = readByte();
-        if ((c2 != '\n') && (c2 != -1)) {
-          pos -= 1;
-        }
-        break loop;
-      default:
-        if (--room < 0) {
-          char[] replacebuf = new char[offset + 128];
-          room = replacebuf.length - offset - 1;
-          System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
-          tmpBuf = replacebuf;
-        }
-        tmpBuf[offset++] = (char) c;
-        break;
-      }
-    }
-    if ((c == -1) && (offset == 0)) {
-      return null;
-    }
-    return String.copyValueOf(tmpBuf, 0, offset);
-  }
-
-  @Override
-  public String readUTF() throws IOException {
-    // Note that this code is mostly copied from DataInputStream
-    int utflen = readUnsignedShort();
-
-    byte[] bytearr = new byte[utflen];
-    char[] chararr = new char[utflen];
-
-    int c;
-    int char2;
-    int char3;
-    int count = 0;
-    int chararrCount = 0;
-
-    readFully(bytearr, 0, utflen);
-
-    while (count < utflen) {
-      c = (int) bytearr[count] & 0xff;
-      if (c > 127) {
-        break;
-      }
-      count++;
-      chararr[chararrCount++] = (char) c;
-    }
-
-    while (count < utflen) {
-      c = (int) bytearr[count] & 0xff;
-      switch (c >> 4) {
-      case 0:
-      case 1:
-      case 2:
-      case 3:
-      case 4:
-      case 5:
-      case 6:
-      case 7:
-        /* 0xxxxxxx */
-        count++;
-        chararr[chararrCount++] = (char) c;
-        break;
-      case 12:
-      case 13:
-        /* 110x xxxx   10xx xxxx*/
-        count += 2;
-        if (count > utflen) {
-          throw new UTFDataFormatException(
-              "malformed input: partial character at end");
-        }
-        char2 = (int) bytearr[count - 1];
-        if ((char2 & 0xC0) != 0x80) {
-          throw new UTFDataFormatException(
-              "malformed input around byte " + count);
-        }
-        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
-            (char2 & 0x3F));
-        break;
-      case 14:
-        /* 1110 xxxx  10xx xxxx  10xx xxxx */
-        count += 3;
-        if (count > utflen) {
-          throw new UTFDataFormatException(
-              "malformed input: partial character at end");
-        }
-        char2 = (int) bytearr[count - 2];
-        char3 = (int) bytearr[count - 1];
-        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-          throw new UTFDataFormatException(
-              "malformed input around byte " + (count - 1));
-        }
-        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
-            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-        break;
-      default:
-        /* 10xx xxxx,  1111 xxxx */
-        throw new UTFDataFormatException(
-            "malformed input around byte " + count);
-      }
-    }
-    // The number of chars produced may be less than utflen
-    return new String(chararr, 0, chararrCount);
-  }
-
-  /**
-   * Get an int at an arbitrary position in a byte[]
-   *
-   * @param buf Buffer to get the int from
-   * @param pos Position in the buffer to get the int from
-   * @return Int at the buffer position
-   */
-  public static int getInt(byte[] buf, int pos) {
-    return UNSAFE.getInt(buf,
-        BYTE_ARRAY_OFFSET + pos);
+    super(buf, offset, length);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index 4b413da..8736590 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -22,12 +22,21 @@ import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
 /**
  * Byte array output stream that uses Unsafe methods to serialize/deserialize
  * much faster
  */
 public class UnsafeByteArrayOutputStream extends OutputStream
-    implements ExtendedDataOutput {
+  implements ExtendedDataOutput {
   static {
     try {
       Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
@@ -42,24 +51,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
     }
   }
 
-  /** Bytes used in a boolean */
-  public static final int SIZE_OF_BOOLEAN = 1;
-  /** Bytes used in a byte */
-  public static final int SIZE_OF_BYTE = 1;
-  /** Bytes used in a char */
-  public static final int SIZE_OF_CHAR = 2;
-  /** Bytes used in a short */
-  public static final int SIZE_OF_SHORT = 2;
-  /** Bytes used in a medium */
-  public static final int SIZE_OF_MEDIUM = 3;
-  /** Bytes used in an int */
-  public static final int SIZE_OF_INT = 4;
-  /** Bytes used in a float */
-  public static final int SIZE_OF_FLOAT = 4;
-  /** Bytes used in a long */
-  public static final int SIZE_OF_LONG = 8;
-  /** Bytes used in a double */
-  public static final int SIZE_OF_DOUBLE = 8;
   /** Default number of bytes */
   private static final int DEFAULT_BYTES = 32;
   /** Access to the unsafe class */
@@ -68,12 +59,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   /** Offset of a byte array */
   private static final long BYTE_ARRAY_OFFSET  =
       UNSAFE.arrayBaseOffset(byte[].class);
-  /** Offset of a long array */
-  private static final long LONG_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(long[].class);
-  /** Offset of a double array */
-  private static final long DOUBLE_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(double[].class);
 
   /** Byte buffer */
   private byte[] buf;
@@ -142,7 +127,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   @Override
   public byte[] toByteArray() {
     return Arrays.copyOf(buf, pos);
+  }
 
+  @Override
+  public byte[] toByteArray(int offset, int length) {
+    if (offset + length > pos) {
+      throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+          "Length: %d exceeds the size of buf : %d", offset, length, pos));
+    }
+    return Arrays.copyOfRange(buf, offset, length);
   }
 
   @Override
@@ -212,10 +205,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   }
 
   @Override
-  public void skipBytes(int bytesToSkip) {
-    if ((pos + bytesToSkip) > buf.length) {
-      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip));
+  public void ensureWritable(int minSize) {
+    if ((pos + minSize) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
     }
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    ensureWritable(bytesToSkip);
     pos += bytesToSkip;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
new file mode 100644
index 0000000..5f99846
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public abstract class UnsafeReads implements ExtendedDataInput {
+
+  /** Buffer length */
+  protected final int bufLength;
+  /** Position in the buffer */
+  protected long pos = 0;
+
+  /**
+   * Constructor
+   *
+   * @param length buf length
+   */
+  public UnsafeReads(int length) {
+    bufLength = length;
+  }
+
+  /**
+   * Constructor with offset
+   *
+   * @param offset offset in memory
+   * @param length buf length
+   */
+  public UnsafeReads(long offset, int length) {
+    pos = offset;
+    bufLength = length;
+  }
+
+  /**
+   * How many bytes are still available?
+   *
+   * @return Number of bytes available
+   */
+  public abstract int available();
+
+  /**
+   * What position in the stream?
+   *
+   * @return Position
+   */
+  public abstract int getPos();
+
+  /**
+   * Check whether there are enough remaining bytes for an operation
+   *
+   * @param requiredBytes Bytes required to read
+   * @throws IOException When there are not enough bytes to read
+   */
+  protected void ensureRemaining(int requiredBytes) throws IOException {
+    if (available() < requiredBytes) {
+      throw new IOException("ensureRemaining: Only " + available() +
+          " bytes remaining, trying to read " + requiredBytes);
+    }
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    ensureRemaining(n);
+    pos += n;
+    return n;
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    char[] tmpBuf = new char[128];
+
+    int room = tmpBuf.length;
+    int offset = 0;
+    int c;
+
+  loop:
+    while (true) {
+      c = readByte();
+      switch (c) {
+      case -1:
+      case '\n':
+        break loop;
+      case '\r':
+        int c2 = readByte();
+        if ((c2 != '\n') && (c2 != -1)) {
+          pos -= 1;
+        }
+        break loop;
+      default:
+        if (--room < 0) {
+          char[] replacebuf = new char[offset + 128];
+          room = replacebuf.length - offset - 1;
+          System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+          tmpBuf = replacebuf;
+        }
+        tmpBuf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(tmpBuf, 0, offset);
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    int utflen = readUnsignedShort();
+
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int c;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+      /* 0xxxxxxx */
+        count++;
+        chararr[chararrCount++] = (char) c;
+        break;
+      case 12:
+      case 13:
+      /* 110x xxxx   10xx xxxx*/
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + count);
+        }
+        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+            (char2 & 0x3F));
+        break;
+      case 14:
+      /* 1110 xxxx  10xx xxxx  10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 2];
+        char3 = (int) bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + (count - 1));
+        }
+        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+      /* 10xx xxxx,  1111 xxxx */
+        throw new UTFDataFormatException(
+            "malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
deleted file mode 100644
index 8673732..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-/** Verbose Error mesage for ByteArray based messages */
-public class VerboseByteArrayMessageWrite {
-  /** Do not construct */
-  protected VerboseByteArrayMessageWrite() {
-  }
-
-  /**
-   * verboseWriteCurrentMessage
-   * de-serialize, then write messages
-   *
-   * @param iterator iterator
-   * @param out DataOutput
-   * @param <I> vertexId
-   * @param <M> message
-   * @throws IOException
-   * @throws RuntimeException
-   */
-  public static <I extends WritableComparable, M extends Writable> void
-  verboseWriteCurrentMessage(
-    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
-    iterator, DataOutput out) throws IOException {
-    try {
-      iterator.getCurrentMessage().write(out);
-    } catch (NegativeArraySizeException e) {
-      throw new RuntimeException("The numbers of bytes sent to vertex " +
-          iterator.getCurrentVertexId() + " exceeded the max capacity of " +
-          "its ExtendedDataOutput. Please consider setting " +
-          "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
-          " in the graph which receive a lot of messages (total serialized " +
-          "size of messages goes beyond the maximum size of a byte array), " +
-          "setting this option to true will remove that limit");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
new file mode 100644
index 0000000..aa25490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Verbose Error mesage for ByteArray based messages */
+public class VerboseByteStructMessageWrite {
+  /**
+   * Private Constructor
+   */
+  private VerboseByteStructMessageWrite() {
+  }
+
+  /**
+   * verboseWriteCurrentMessage
+   * de-serialize, then write messages
+   *
+   * @param iterator iterator
+   * @param out DataOutput
+   * @param <I> vertexId
+   * @param <M> message
+   * @throws IOException
+   * @throws RuntimeException
+   */
+  public static <I extends WritableComparable, M extends Writable> void
+  verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
+    DataOutput out) throws IOException {
+    try {
+      iterator.getCurrentMessage().write(out);
+    } catch (NegativeArraySizeException e) {
+      handleNegativeArraySize(iterator.getCurrentVertexId());
+    }
+  }
+
+  /**
+   * message to present on NegativeArraySizeException
+   *
+   * @param vertexId vertexId
+   * @param <I> vertexId type
+   */
+  public static <I extends WritableComparable> void handleNegativeArraySize(
+      I vertexId) {
+    throw new RuntimeException("The numbers of bytes sent to vertex " +
+        vertexId + " exceeded the max capacity of " +
+        "its ExtendedDataOutput. Please consider setting " +
+        "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
+        " in the graph which receive a lot of messages (total serialized " +
+        "size of messages goes beyond the maximum size of a byte array), " +
+        "setting this option to true will remove that limit");
+  }
+}