You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/06/08 17:25:30 UTC

[1/3] GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)

Repository: giraph
Updated Branches:
  refs/heads/trunk e92f2942f -> 535a333b7


http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
new file mode 100644
index 0000000..b8f8c8e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores vertex ids and data associated with a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdData<I extends WritableComparable, T>
+  extends ImmutableClassesGiraphConfigurable, Writable {
+  /**
+   * Create a new data object.
+   *
+   * @return Newly-created data object.
+   */
+  T createData();
+
+  /**
+   * Write a data object to an {@link ExtendedDataOutput}.
+   *
+   * @param out  {@link ExtendedDataOutput}
+   * @param data Data object to write
+   * @throws IOException
+   */
+  void writeData(ExtendedDataOutput out, T data) throws IOException;
+
+  /**
+   * Read a data object's fields from an {@link ExtendedDataInput}.
+   *
+   * @param in   {@link ExtendedDataInput}
+   * @param data Data object to fill in-place
+   * @throws IOException
+   */
+  void readData(ExtendedDataInput in, T data) throws IOException;
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  void initialize();
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  void initialize(int expectedSize);
+
+  /**
+   * Add a vertex id and data pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param data Data
+   */
+  void add(I vertexId, T data);
+
+  /**
+   * Add a serialized vertex id and data.
+   *
+   * @param serializedId The bye array which holds the serialized id.
+   * @param idPos The end position of the serialized id in the byte array.
+   * @param data Data
+   */
+  void add(byte[] serializedId, int idPos, T data);
+
+  /**
+   * Get the number of bytes used.
+   *
+   * @return Bytes used
+   */
+  int getSize();
+
+  /**
+   * Get the size of this object in serialized form.
+   *
+   * @return The size (in bytes) of the serialized object
+   */
+  int getSerializedSize();
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return Whether the list is empty
+   */
+  boolean isEmpty();
+
+  /**
+   * Clear the list.
+   */
+  void clear();
+
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  VertexIdDataIterator<I, T> getVertexIdDataIterator();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
new file mode 100644
index 0000000..6aea8ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdDataIterator<I extends WritableComparable, T>
+    extends VertexIdIterator<I> {
+  /**
+   * Get the current data.
+   *
+   * @return Current data
+   */
+  T getCurrentData();
+
+  /**
+   * Release the current data object.
+   *
+   * @return Released data object
+   */
+  T releaseCurrentData();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
new file mode 100644
index 0000000..b9c88ec
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdgeIterator<I extends WritableComparable,
+    E extends Writable> extends VertexIdDataIterator<I, Edge<I, E>> {
+  /**
+   * Get the current edge.
+   *
+   * @return Current edge
+   */
+  Edge<I, E> getCurrentEdge();
+
+  /**
+   * Release the current edge.
+   *
+   * @return Released edge
+   */
+  Edge<I, E> releaseCurrentEdge();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
new file mode 100644
index 0000000..8f3c03a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores vertex id and out-edges of a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdges<I extends WritableComparable,
+    E extends Writable> extends VertexIdData<I, Edge<I, E>> {
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  VertexIdEdgeIterator<I, E> getVertexIdEdgeIterator();
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
index bad11d6..baaa543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -27,38 +26,18 @@ import org.apache.hadoop.io.WritableComparable;
  *
  * @param <I> Vertex id
  */
-public abstract class VertexIdIterator<I extends WritableComparable> {
-  /** Reader of the serialized edges */
-  protected final ExtendedDataInput extendedDataInput;
-
-  /** Current vertex id */
-  protected I vertexId;
-
-  /**
-   * Constructor.
-   *
-   * @param extendedDataOutput Extended data output
-   * @param configuration Configuration
-   */
-  public VertexIdIterator(
-      ExtendedDataOutput extendedDataOutput,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> configuration) {
-    extendedDataInput = configuration.createExtendedDataInput(
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-  }
-
+public interface VertexIdIterator<I extends WritableComparable> {
   /**
    * Returns true if the iteration has more elements.
    *
    * @return True if the iteration has more elements.
    */
-  public boolean hasNext() {
-    return extendedDataInput.available() > 0;
-  }
+  boolean hasNext();
+
   /**
    * Moves to the next element in the iteration.
    */
-  public abstract void next();
+  void next();
 
   /**
    * Get the current vertex id.  Ihis object's contents are only guaranteed
@@ -67,9 +46,8 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
    *
    * @return Current vertex id
    */
-  public I getCurrentVertexId() {
-    return vertexId;
-  }
+  I getCurrentVertexId();
+
   /**
    * The backing store of the current vertex id is now released.
    * Further calls to getCurrentVertexId () without calling next()
@@ -77,9 +55,6 @@ public abstract class VertexIdIterator<I extends WritableComparable> {
    *
    * @return Current vertex id that was released
    */
-  public I releaseCurrentVertexId() {
-    I releasedVertexId = vertexId;
-    vertexId = null;
-    return releasedVertexId;
-  }
+  I releaseCurrentVertexId();
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..194bf69
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.  Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageBytesIterator<I extends WritableComparable,
+    M extends Writable> extends VertexIdDataIterator<I, M> {
+
+  /**
+   * Write the current message to an ExtendedDataOutput object
+   *
+   * @param dataOutput Where the current message will be written to
+   */
+  void writeCurrentMessageBytes(DataOutput dataOutput);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
new file mode 100644
index 0000000..c241cea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageIterator<I extends WritableComparable,
+    M extends Writable> extends VertexIdDataIterator<I, M> {
+  /**
+   * Get the current message.
+   *
+   * @return Current message
+   */
+  M getCurrentMessage();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
new file mode 100644
index 0000000..99e5d71
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * VertexIdMessages
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessages<I extends WritableComparable,
+  M extends Writable> extends VertexIdData<I, M> {
+  /**
+   * Get specialized iterator that will instantiate the vertex id and
+   * message of this object.  It will only produce message bytes, not actual
+   * messages and expects a different encoding.
+   *
+   * @return Special iterator that reuses vertex ids (unless released) and
+   *         copies message bytes
+   */
+  VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator();
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.
+   *
+   * @return Special iterator that reuses vertex ids and messages unless
+   *         specified
+   */
+  VertexIdMessageIterator<I, M> getVertexIdMessageIterator();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 236bc88..157a543 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -27,6 +27,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.TestMessageValueFactory;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
@@ -48,6 +49,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test all the netty failure scenarios
  */
+@SuppressWarnings("unchecked")
 public class RequestFailureTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
@@ -75,10 +77,10 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+    PairList<Integer, VertexIdMessages<IntWritable,
                 IntWritable>>
         dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+        VertexIdMessages<IntWritable, IntWritable>>();
     dataToSend.initialize();
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
@@ -97,6 +99,7 @@ public class RequestFailureTest {
     // Send the request
     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
         new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+    request.setConf(conf);
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index fcdfa5c..32454f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -36,6 +36,7 @@ import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayOneToAllMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
@@ -64,6 +65,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test all the different netty requests.
  */
+@SuppressWarnings("unchecked")
 public class RequestTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
@@ -141,16 +143,15 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+    PairList<Integer, VertexIdMessages<IntWritable,
             IntWritable>>
-        dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+        dataToSend = new PairList<>();
     dataToSend.initialize();
     int partitionId = 0;
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
-            new TestMessageValueFactory<IntWritable>(IntWritable.class));
+        new ByteArrayVertexIdMessages<>(
+            new TestMessageValueFactory<>(IntWritable.class));
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -163,7 +164,9 @@ public class RequestTest {
 
     // Send the request
     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
-      new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+      new SendWorkerMessagesRequest<>(dataToSend);
+    request.setConf(conf);
+
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -195,8 +198,8 @@ public class RequestTest {
   public void sendWorkerOneToAllMessagesRequest() throws IOException {
     // Data to send
     ByteArrayOneToAllMessages<IntWritable, IntWritable>
-        dataToSend = new ByteArrayOneToAllMessages<
-        IntWritable, IntWritable>(new TestMessageValueFactory<IntWritable>(IntWritable.class));
+        dataToSend = new ByteArrayOneToAllMessages<>(new
+        TestMessageValueFactory<>(IntWritable.class));
     dataToSend.setConf(conf);
     dataToSend.initialize();
     ExtendedDataOutput output = conf.createExtendedDataOutput();
@@ -208,7 +211,7 @@ public class RequestTest {
 
     // Send the request
     SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
-      new SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable>(dataToSend, conf);
+      new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -304,4 +307,4 @@ public class RequestTest {
     }
     assertEquals(55, keySum);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 97e88f8..5d8d478 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -30,10 +30,7 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.BasicPartitionOwner;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.SimplePartition;
-import org.apache.giraph.partition.SimplePartitionStore;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -196,7 +193,7 @@ public class MockUtils {
     ServerData<IntWritable, IntWritable, IntWritable> serverData =
       new ServerData<IntWritable, IntWritable, IntWritable>(
       serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory(
-        serviceWorker, conf), context);
+          serviceWorker, conf), context);
     // Here we add a partition to simulate the case that there is one partition.
     serverData.getPartitionStore().addPartition(new SimplePartition());
     return serverData;


[3/3] git commit: updated refs/heads/trunk to 535a333

Posted by pa...@apache.org.
GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/535a333b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/535a333b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/535a333b

Branch: refs/heads/trunk
Commit: 535a333b7776f7a229b76c7656e928d8a21d21bc
Parents: e92f294
Author: Pavan Kumar <pa...@fb.com>
Authored: Sun Jun 8 08:25:03 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Sun Jun 8 08:25:03 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   6 +-
 .../org/apache/giraph/comm/SendEdgeCache.java   |   9 +-
 .../apache/giraph/comm/SendMessageCache.java    |  17 +-
 .../giraph/comm/SendMessageToAllCache.java      |   4 +-
 .../giraph/comm/SendVertexIdDataCache.java      |  20 +-
 .../ByteArrayMessagesPerVertexStore.java        |  48 ++-
 .../giraph/comm/messages/MessageStore.java      |   4 +-
 .../giraph/comm/messages/MessagesIterable.java  |   4 +-
 .../comm/messages/OneMessagePerVertexStore.java |   9 +-
 .../out_of_core/DiskBackedMessageStore.java     |   7 +-
 .../primitives/IntByteArrayMessageStore.java    |  14 +-
 .../primitives/IntFloatMessageStore.java        |  10 +-
 .../primitives/LongByteArrayMessageStore.java   |  14 +-
 .../primitives/LongDoubleMessageStore.java      |  10 +-
 .../giraph/comm/netty/InboundByteCounter.java   |   1 -
 .../NettyWorkerClientRequestProcessor.java      |   8 +-
 .../comm/netty/handler/RequestDecoder.java      |  33 +-
 .../comm/netty/handler/RequestEncoder.java      |  59 ++--
 .../netty/handler/RequestServerHandler.java     |  27 +-
 .../comm/netty/handler/ResponseEncoder.java     |  27 +-
 .../comm/requests/SendWorkerDataRequest.java    |  21 +-
 .../comm/requests/SendWorkerEdgesRequest.java   |  11 +-
 .../requests/SendWorkerMessagesRequest.java     |  21 +-
 .../giraph/comm/requests/WritableRequest.java   |   1 +
 .../apache/giraph/conf/GiraphConfiguration.java |   2 +-
 .../org/apache/giraph/conf/GiraphConstants.java |   2 +-
 .../ImmutableClassesGiraphConfiguration.java    |  12 +
 .../apache/giraph/edge/AbstractEdgeStore.java   |   9 +-
 .../java/org/apache/giraph/edge/EdgeStore.java  |   4 +-
 .../org/apache/giraph/edge/SimpleEdgeStore.java |   4 +-
 .../giraph/edge/primitives/IntEdgeStore.java    |   5 +-
 .../giraph/edge/primitives/LongEdgeStore.java   |   5 +-
 .../giraph/utils/AbstractVertexIdData.java      | 122 +++++++
 .../apache/giraph/utils/ByteArrayIterable.java  |  76 -----
 .../apache/giraph/utils/ByteArrayIterator.java  |  72 -----
 .../giraph/utils/ByteArrayVertexIdData.java     | 200 +-----------
 .../giraph/utils/ByteArrayVertexIdEdges.java    |  37 +--
 .../giraph/utils/ByteArrayVertexIdMessages.java | 122 ++-----
 .../apache/giraph/utils/ByteStructIterable.java |  61 ++++
 .../apache/giraph/utils/ByteStructIterator.java |  72 +++++
 .../utils/ByteStructVertexIdDataIterator.java   |  83 +++++
 .../utils/ByteStructVertexIdEdgeIterator.java   |  56 ++++
 .../utils/ByteStructVertexIdIterator.java       |  71 +++++
 .../ByteStructVertexIdMessageBytesIterator.java |  80 +++++
 .../ByteStructVertexIdMessageIterator.java      |  49 +++
 .../java/org/apache/giraph/utils/ByteUtils.java |  47 +++
 .../utils/ExtendedByteArrayDataInput.java       |   2 +-
 .../utils/ExtendedByteArrayDataOutput.java      |  20 +-
 .../apache/giraph/utils/ExtendedDataOutput.java |  18 ++
 .../utils/RepresentativeByteArrayIterable.java  |  68 ----
 .../utils/RepresentativeByteArrayIterator.java  |  52 ---
 .../utils/RepresentativeByteStructIterable.java |  52 +++
 .../utils/RepresentativeByteStructIterator.java |  52 +++
 .../org/apache/giraph/utils/RequestUtils.java   |  55 ++++
 .../apache/giraph/utils/UnsafeArrayReads.java   | 200 ++++++++++++
 .../utils/UnsafeByteArrayInputStream.java       | 319 +------------------
 .../utils/UnsafeByteArrayOutputStream.java      |  54 ++--
 .../org/apache/giraph/utils/UnsafeReads.java    | 209 ++++++++++++
 .../utils/VerboseByteArrayMessageWrite.java     |  60 ----
 .../utils/VerboseByteStructMessageWrite.java    |  72 +++++
 .../org/apache/giraph/utils/VertexIdData.java   | 123 +++++++
 .../giraph/utils/VertexIdDataIterator.java      |  49 +++
 .../giraph/utils/VertexIdEdgeIterator.java      |  47 +++
 .../org/apache/giraph/utils/VertexIdEdges.java  |  40 +++
 .../apache/giraph/utils/VertexIdIterator.java   |  41 +--
 .../utils/VertexIdMessageBytesIterator.java     |  47 +++
 .../giraph/utils/VertexIdMessageIterator.java   |  39 +++
 .../apache/giraph/utils/VertexIdMessages.java   |  50 +++
 .../apache/giraph/comm/RequestFailureTest.java  |   7 +-
 .../org/apache/giraph/comm/RequestTest.java     |  23 +-
 .../java/org/apache/giraph/utils/MockUtils.java |   5 +-
 71 files changed, 2020 insertions(+), 1260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 11e75bb..a0e94c1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,9 +1,11 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
-  GIRAPH-899: Remove hcatalog from hadoop_facebook profile
+  GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)  
 
-  GIRAPH-873: Specialized edge stores 
+  GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka)
+
+  GIRAPH-873: Specialized edge stores (pavanka)
  
   GIRAPH-898: Remove giraph-accumulo from Facebook profile (edunov via majakabiljo)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
index 8350a55..23d3b8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendEdgeCache.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -38,7 +39,7 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
  * @param <E> Edge value
  */
 public class SendEdgeCache<I extends WritableComparable, E extends Writable>
-    extends SendVertexIdDataCache<I, Edge<I, E>, ByteArrayVertexIdEdges<I, E>> {
+    extends SendVertexIdDataCache<I, Edge<I, E>, VertexIdEdges<I, E>> {
   /**
    * Constructor
    *
@@ -52,7 +53,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
   }
 
   @Override
-  public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
+  public VertexIdEdges<I, E> createVertexIdData() {
     return new ByteArrayVertexIdEdges<I, E>();
   }
 
@@ -78,7 +79,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
    * @return List of pairs (partitionId, ByteArrayVertexIdEdges),
    *         where all partition ids belong to workerInfo
    */
-  public PairList<Integer, ByteArrayVertexIdEdges<I, E>>
+  public PairList<Integer, VertexIdEdges<I, E>>
   removeWorkerEdges(WorkerInfo workerInfo) {
     return removeWorkerData(workerInfo);
   }
@@ -88,7 +89,7 @@ public class SendEdgeCache<I extends WritableComparable, E extends Writable>
    *
    * @return All vertex edges for all partitions
    */
-  public PairList<WorkerInfo, PairList<Integer, ByteArrayVertexIdEdges<I, E>>>
+  public PairList<WorkerInfo, PairList<Integer, VertexIdEdges<I, E>>>
   removeAllEdges() {
     return removeAllData();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
index 24848db..b1fec01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java
@@ -28,6 +28,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
@@ -45,8 +46,9 @@ import static org.apache.giraph.conf.GiraphConstants.MAX_MSG_REQUEST_SIZE;
  * @param <I> Vertex id
  * @param <M> Message data
  */
+@SuppressWarnings("unchecked")
 public class SendMessageCache<I extends WritableComparable, M extends Writable>
-    extends SendVertexIdDataCache<I, M, ByteArrayVertexIdMessages<I, M>> {
+    extends SendVertexIdDataCache<I, M, VertexIdMessages<I, M>> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendMessageCache.class);
@@ -58,7 +60,6 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
   protected final int maxMessagesSizePerWorker;
   /** NettyWorkerClientRequestProcessor for message sending */
   protected final NettyWorkerClientRequestProcessor<I, ?, ?> clientProcessor;
-
   /**
    * Constructor
    *
@@ -78,7 +79,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
   }
 
   @Override
-  public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+  public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
         getConf().getOutgoingMessageValueFactory());
   }
@@ -122,7 +123,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
    *         where all partition ids belong to workerInfo
    */
-  protected PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+  protected PairList<Integer, VertexIdMessages<I, M>>
   removeWorkerMessages(WorkerInfo workerInfo) {
     return removeWorkerData(workerInfo);
   }
@@ -133,7 +134,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    * @return All vertex messages for all partitions
    */
   private PairList<WorkerInfo, PairList<
-      Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
+      Integer, VertexIdMessages<I, M>>> removeAllMessages() {
     return removeAllData();
   }
 
@@ -159,7 +160,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageSize >= maxMessagesSizePerWorker) {
-      PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+      PairList<Integer, VertexIdMessages<I, M>>
         workerMessages = removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
         new SendWorkerMessagesRequest<I, M>(workerMessages);
@@ -233,10 +234,10 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable>
    */
   public void flush() {
     PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessages<I, M>>>
+        VertexIdMessages<I, M>>>
     remainingMessageCache = removeAllMessages();
     PairList<WorkerInfo, PairList<
-        Integer, ByteArrayVertexIdMessages<I, M>>>.Iterator
+        Integer, VertexIdMessages<I, M>>>.Iterator
     iterator = remainingMessageCache.getIterator();
     while (iterator.hasNext()) {
       iterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
index 54234c5..60858ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
@@ -30,7 +30,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
 import org.apache.giraph.worker.WorkerInfo;
@@ -249,7 +249,7 @@ public class SendMessageToAllCache<I extends WritableComparable,
         }
         ++totalMsgsSentInSuperstep;
         if (workerMessageSize >= maxMessagesSizePerWorker) {
-          PairList<Integer, ByteArrayVertexIdMessages<I, M>>
+          PairList<Integer, VertexIdMessages<I, M>>
             workerMessages = removeWorkerMessages(workerInfoList[i]);
           writableRequest =
             new SendWorkerMessagesRequest<I, M>(workerMessages);

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
index afce3ba..19aa991 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendVertexIdDataCache.java
@@ -20,7 +20,7 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.VertexIdData;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,12 +32,12 @@ import javax.annotation.concurrent.NotThreadSafe;
  *
  * @param <I> Vertex id
  * @param <T> Data
- * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ * @param <B> Specialization of {@link VertexIdData} for T
  */
 @NotThreadSafe
 @SuppressWarnings("unchecked")
 public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
-    B extends ByteArrayVertexIdData<I, T>> extends SendDataCache<B> {
+    B extends VertexIdData<I, T>> extends SendDataCache<B> {
   /**
    * Constructor.
    *
@@ -55,11 +55,11 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
   }
 
   /**
-   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+   * Create a new {@link VertexIdData} specialized for the use case.
    *
-   * @return A new instance of {@link ByteArrayVertexIdData}
+   * @return A new instance of {@link VertexIdData}
    */
-  public abstract B createByteArrayVertexIdData();
+  public abstract B createVertexIdData();
 
   /**
    * Add data to the cache.
@@ -73,7 +73,7 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
   public int addData(WorkerInfo workerInfo,
                      int partitionId, I destVertexId, T data) {
     // Get the data collection
-    ByteArrayVertexIdData<I, T> partitionData =
+    VertexIdData<I, T> partitionData =
         getPartitionData(workerInfo, partitionId);
     int originalSize = partitionData.getSize();
     partitionData.add(destVertexId, data);
@@ -97,7 +97,7 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
   public int addData(WorkerInfo workerInfo, int partitionId,
                      byte[] serializedId, int idPos, T data) {
     // Get the data collection
-    ByteArrayVertexIdData<I, T> partitionData =
+    VertexIdData<I, T> partitionData =
         getPartitionData(workerInfo, partitionId);
     int originalSize = partitionData.getSize();
     partitionData.add(serializedId, idPos, data);
@@ -114,12 +114,12 @@ public abstract class SendVertexIdDataCache<I extends WritableComparable, T,
    * @param partitionId The remote Partition this message belongs to
    * @return The partition data in data cache
    */
-  private ByteArrayVertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
+  private VertexIdData<I, T> getPartitionData(WorkerInfo workerInfo,
                                                        int partitionId) {
     // Get the data collection
     B partitionData = getData(partitionId);
     if (partitionData == null) {
-      partitionData = createByteArrayVertexIdData();
+      partitionData = createVertexIdData();
       partitionData.setConf(getConf());
       partitionData.initialize(getInitialBufferSize(workerInfo.getTaskId()));
       setData(partitionId, partitionData);

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index e8b3b30..65939bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -26,11 +26,12 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.RepresentativeByteArrayIterator;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
 import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.RepresentativeByteStructIterator;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +40,8 @@ import com.google.common.collect.Iterators;
 
 /**
  * Implementation of {@link SimpleMessageStore} where multiple messages are
- * stored per vertex as byte arrays.  Used when there is no combiner provided.
+ * stored per vertex as byte backed datastructures.
+ * Used when there is no combiner provided.
  *
  * @param <I> Vertex id
  * @param <M> Message data
@@ -88,10 +90,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
   @Override
   public void addPartitionMessages(
       int partitionId,
-      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) throws IOException {
     ConcurrentMap<I, DataInputOutput> partitionMap =
         getOrCreatePartitionMap(partitionId);
-    ByteArrayVertexIdMessages<I, M>.VertexIdMessageBytesIterator
+    VertexIdMessageBytesIterator<I, M>
         vertexIdMessageBytesIterator =
         messages.getVertexIdMessageBytesIterator();
     // Try to copy the message buffer over rather than
@@ -111,7 +113,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
         }
       }
     } else {
-      ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+      VertexIdMessageIterator<I, M>
           vertexIdMessageIterator = messages.getVertexIdMessageIterator();
       while (vertexIdMessageIterator.hasNext()) {
         vertexIdMessageIterator.next();
@@ -119,7 +121,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
             getDataInputOutput(partitionMap, vertexIdMessageIterator);
 
         synchronized (dataInputOutput) {
-          VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(
+          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
               vertexIdMessageIterator, dataInputOutput.getDataOutput());
         }
       }
@@ -132,33 +134,19 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
     return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
   }
 
-  /**
-   * Special iterator only for counting messages
-   */
-  private class RepresentativeMessageIterator extends
-      RepresentativeByteArrayIterator<M> {
-    /**
-     * Constructor
-     *
-     * @param dataInput DataInput containing the messages
-     */
-    public RepresentativeMessageIterator(ExtendedDataInput dataInput) {
-      super(dataInput);
-    }
-
-    @Override
-    protected M createWritable() {
-      return messageValueFactory.newInstance();
-    }
-  }
-
   @Override
   protected int getNumberOfMessagesIn(
       ConcurrentMap<I, DataInputOutput> partitionMap) {
     int numberOfMessages = 0;
     for (DataInputOutput dataInputOutput : partitionMap.values()) {
       numberOfMessages += Iterators.size(
-          new RepresentativeMessageIterator(dataInputOutput.createDataInput()));
+          new RepresentativeByteStructIterator<M>(
+              dataInputOutput.createDataInput()) {
+            @Override
+            protected M createWritable() {
+              return messageValueFactory.newInstance();
+            }
+          });
     }
     return numberOfMessages;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 2af7642..7d0bbc6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm.messages;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -75,7 +75,7 @@ public interface MessageStore<I extends WritableComparable,
    * @throws IOException
    */
   void addPartitionMessages(
-      int partitionId, ByteArrayVertexIdMessages<I, M> messages)
+      int partitionId, VertexIdMessages<I, M> messages)
     throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
index 3b22ab3..abbac0b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessagesIterable.java
@@ -21,7 +21,7 @@ package org.apache.giraph.comm.messages;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ExtendedDataInput;
 import org.apache.giraph.utils.Factory;
-import org.apache.giraph.utils.RepresentativeByteArrayIterable;
+import org.apache.giraph.utils.RepresentativeByteStructIterable;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.Writable;
  * @param <M> Message data
  */
 public class MessagesIterable<M extends Writable>
-    extends RepresentativeByteArrayIterable<M> {
+    extends RepresentativeByteStructIterable<M> {
   /** Message class */
   private final MessageValueFactory<M> messageValueFactory;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index bb581c0..9bede06 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -28,7 +28,8 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -64,11 +65,11 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
   @Override
   public void addPartitionMessages(
       int partitionId,
-      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) throws IOException {
     ConcurrentMap<I, M> partitionMap =
         getOrCreatePartitionMap(partitionId);
-    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
-        vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+    VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+      messages.getVertexIdMessageIterator();
     // This loop is a little complicated as it is optimized to only create
     // the minimal amount of vertex id and message objects as possible.
     while (vertexIdMessageIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 1a76306..18b7798 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -24,8 +24,9 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import java.io.DataInput;
@@ -84,10 +85,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
   @Override
   public void addPartitionMessages(
       int partitionId,
-      ByteArrayVertexIdMessages<I, M> messages) throws IOException {
+      VertexIdMessages<I, M> messages) throws IOException {
     PartitionDiskBackedMessageStore<I, M> partitionMessageStore =
         getMessageStore(partitionId);
-    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
+    VertexIdMessageIterator<I, M>
         vertexIdMessageIterator =
         messages.getVertexIdMessageIterator();
     while (vertexIdMessageIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index cc14c6d..dbc1ce8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -24,9 +24,11 @@ import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -123,12 +125,12 @@ public class IntByteArrayMessageStore<M extends Writable>
 
   @Override
   public void addPartitionMessages(int partitionId,
-      ByteArrayVertexIdMessages<IntWritable, M> messages) throws
+      VertexIdMessages<IntWritable, M> messages) throws
       IOException {
     Int2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     synchronized (partitionMap) {
-      ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageBytesIterator
+      VertexIdMessageBytesIterator<IntWritable, M>
           vertexIdMessageBytesIterator =
           messages.getVertexIdMessageBytesIterator();
       // Try to copy the message buffer over rather than
@@ -145,13 +147,13 @@ public class IntByteArrayMessageStore<M extends Writable>
               dataInputOutput.getDataOutput());
         }
       } else {
-        ByteArrayVertexIdMessages<IntWritable, M>.VertexIdMessageIterator
+        VertexIdMessageIterator<IntWritable, M>
             iterator = messages.getVertexIdMessageIterator();
         while (iterator.hasNext()) {
           iterator.next();
           DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
               iterator.getCurrentVertexId().get());
-          VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(iterator,
+          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
               dataInputOutput.getDataOutput());
         }
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 3318610..be75ee8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -22,9 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
-
 import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
@@ -97,7 +98,7 @@ public class IntFloatMessageStore
 
   @Override
   public void addPartitionMessages(int partitionId,
-      ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages) throws
+      VertexIdMessages<IntWritable, FloatWritable> messages) throws
       IOException {
     IntWritable reusableVertexId = new IntWritable();
     FloatWritable reusableMessage = new FloatWritable();
@@ -105,8 +106,7 @@ public class IntFloatMessageStore
 
     Int2FloatOpenHashMap partitionMap = map.get(partitionId);
     synchronized (partitionMap) {
-      ByteArrayVertexIdMessages<IntWritable,
-          FloatWritable>.VertexIdMessageIterator
+      VertexIdMessageIterator<IntWritable, FloatWritable>
           iterator = messages.getVertexIdMessageIterator();
       while (iterator.hasNext()) {
         iterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
index 9e4325f..3110864 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
@@ -24,9 +24,11 @@ import org.apache.giraph.comm.messages.MessagesIterable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.VerboseByteArrayMessageWrite;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -124,12 +126,12 @@ public class LongByteArrayMessageStore<M extends Writable>
 
   @Override
   public void addPartitionMessages(int partitionId,
-      ByteArrayVertexIdMessages<LongWritable, M> messages) throws
+      VertexIdMessages<LongWritable, M> messages) throws
       IOException {
     Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
         map.get(partitionId);
     synchronized (partitionMap) {
-      ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageBytesIterator
+      VertexIdMessageBytesIterator<LongWritable, M>
           vertexIdMessageBytesIterator =
           messages.getVertexIdMessageBytesIterator();
       // Try to copy the message buffer over rather than
@@ -146,13 +148,13 @@ public class LongByteArrayMessageStore<M extends Writable>
               dataInputOutput.getDataOutput());
         }
       } else {
-        ByteArrayVertexIdMessages<LongWritable, M>.VertexIdMessageIterator
+        VertexIdMessageIterator<LongWritable, M>
             iterator = messages.getVertexIdMessageIterator();
         while (iterator.hasNext()) {
           iterator.next();
           DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
               iterator.getCurrentVertexId().get());
-          VerboseByteArrayMessageWrite.verboseWriteCurrentMessage(iterator,
+          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
               dataInputOutput.getDataOutput());
         }
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 76d9ffa..264e65a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -22,7 +22,8 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.EmptyIterable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -94,7 +95,7 @@ public class LongDoubleMessageStore
 
   @Override
   public void addPartitionMessages(int partitionId,
-      ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages) throws
+      VertexIdMessages<LongWritable, DoubleWritable> messages) throws
       IOException {
     LongWritable reusableVertexId = new LongWritable();
     DoubleWritable reusableMessage = new DoubleWritable();
@@ -102,9 +103,8 @@ public class LongDoubleMessageStore
 
     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
     synchronized (partitionMap) {
-      ByteArrayVertexIdMessages<LongWritable,
-          DoubleWritable>.VertexIdMessageIterator
-          iterator = messages.getVertexIdMessageIterator();
+      VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
+        messages.getVertexIdMessageIterator();
       while (iterator.hasNext()) {
         iterator.next();
         long vertexId = iterator.getCurrentVertexId().get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
index bcc888d..44b9c5d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
@@ -70,7 +70,6 @@ public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
             "size = " + receivedBytes + ", total bytes = " +
             getBytesReceived());
       }
-
     }
     ctx.fireChannelRead(msg);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 43c01ce..ef3f824 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -48,10 +48,10 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -302,7 +302,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
     // Send a request if the cache of outgoing edges to the remote worker is
     // full
     if (workerEdgesSize >= maxEdgesSizePerWorker) {
-      PairList<Integer, ByteArrayVertexIdEdges<I, E>> workerEdges =
+      PairList<Integer, VertexIdEdges<I, E>> workerEdges =
           sendEdgeCache.removeWorkerEdges(workerInfo);
       WritableRequest writableRequest =
           new SendWorkerEdgesRequest<I, E>(workerEdges);
@@ -414,10 +414,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
 
     // Execute the remaining sends edges (if any)
     PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdEdges<I, E>>>
+        VertexIdEdges<I, E>>>
         remainingEdgeCache = sendEdgeCache.removeAllEdges();
     PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdEdges<I, E>>>.Iterator
+        PairList<Integer, VertexIdEdges<I, E>>>.Iterator
         edgeIterator = remainingEdgeCache.getIterator();
     while (edgeIterator.hasNext()) {
       edgeIterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
index 98a61e6..8659e95 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
@@ -22,14 +22,14 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.InboundByteCounter;
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.RequestUtils;
 import org.apache.log4j.Logger;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.ReferenceCountUtil;
@@ -56,7 +56,7 @@ public class RequestDecoder extends ChannelInboundHandlerAdapter {
    * @param byteCounter Keeps track of the decoded bytes
    */
   public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
-                        InboundByteCounter byteCounter) {
+    InboundByteCounter byteCounter) {
     this.conf = conf;
     this.byteCounter = byteCounter;
   }
@@ -80,26 +80,23 @@ public class RequestDecoder extends ChannelInboundHandlerAdapter {
     }
 
     // Decode the request
-    ByteBuf buffer = (ByteBuf) msg;
-    ByteBufInputStream inputStream = new ByteBufInputStream(buffer);
-    int enumValue = inputStream.readByte();
+    ByteBuf buf = (ByteBuf) msg;
+    int enumValue = buf.readByte();
     RequestType type = RequestType.values()[enumValue];
-    Class<? extends WritableRequest> writableRequestClass =
-        type.getRequestClass();
+    Class<? extends WritableRequest> requestClass = type.getRequestClass();
+    WritableRequest request =
+        ReflectionUtils.newInstance(requestClass, conf);
+    request = RequestUtils.decodeWritableRequest(buf, request);
 
-    WritableRequest writableRequest =
-        ReflectionUtils.newInstance(writableRequestClass, conf);
-    writableRequest.readFields(inputStream);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("decode: Client " + writableRequest.getClientId() +
-          ", requestId " + writableRequest.getRequestId() +
-          ", " +  writableRequest.getType() + ", with size " +
-          buffer.array().length + " took " +
+      LOG.debug("decode: Client " + request.getClientId() +
+          ", requestId " + request.getRequestId() +
+          ", " +  request.getType() + ", with size " +
+          buf.writerIndex() + " took " +
           Times.getNanosSince(TIME, startDecodingNanoseconds) + " ns");
     }
-    // release bytebuf
-    ReferenceCountUtil.release(buffer);
+    ReferenceCountUtil.release(buf);
     // fire writableRequest object to upstream handlers
-    ctx.fireChannelRead(writableRequest);
+    ctx.fireChannelRead(request);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index d379eda..13fa82f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import io.netty.buffer.ByteBufOutputStream;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -27,25 +28,23 @@ import org.apache.giraph.time.Times;
 import org.apache.log4j.Logger;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
 /**
  * Requests have a request type and an encoded request.
  */
 public class RequestEncoder extends ChannelOutboundHandlerAdapter {
-  /** Time class to use */
-  private static final Time TIME = SystemTime.get();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
-  /** Holds the place of the message length until known */
-  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+  /** Time class to use */
+  private static final Time TIME = SystemTime.get();
   /** Buffer starting size */
   private final int bufferStartingSize;
-  /** Whether or not to use direct byte buffers */
-  private final boolean useDirectBuffers;
   /** Start nanoseconds for the encoding time */
   private long startEncodingNanoseconds = -1;
 
@@ -57,14 +56,11 @@ public class RequestEncoder extends ChannelOutboundHandlerAdapter {
   public RequestEncoder(GiraphConfiguration conf) {
     bufferStartingSize =
         GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE.get(conf);
-    useDirectBuffers =
-        GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS.get(conf);
   }
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg,
-    ChannelPromise promise)
-    throws Exception {
+    ChannelPromise promise) throws Exception {
     if (!(msg instanceof WritableRequest)) {
       throw new IllegalArgumentException(
           "encode: Got a message of type " + msg.getClass());
@@ -74,40 +70,41 @@ public class RequestEncoder extends ChannelOutboundHandlerAdapter {
     if (LOG.isDebugEnabled()) {
       startEncodingNanoseconds = TIME.getNanoseconds();
     }
+
     ByteBuf buf;
-    WritableRequest writableRequest = (WritableRequest) msg;
-    int requestSize = writableRequest.getSerializedSize();
+    WritableRequest request = (WritableRequest) msg;
+    int requestSize = request.getSerializedSize();
     if (requestSize == WritableRequest.UNKNOWN_SIZE) {
       buf = ctx.alloc().buffer(bufferStartingSize);
     } else {
-      requestSize += LENGTH_PLACEHOLDER.length + 1;
-      buf = useDirectBuffers ? ctx.alloc().directBuffer(requestSize) :
-          ctx.alloc().buffer(requestSize);
+      requestSize +=  SIZE_OF_INT + SIZE_OF_BYTE;
+      buf = ctx.alloc().buffer(requestSize);
     }
-    ByteBufOutputStream outputStream =
-        new ByteBufOutputStream(buf);
-    outputStream.write(LENGTH_PLACEHOLDER);
-    outputStream.writeByte(writableRequest.getType().ordinal());
+    ByteBufOutputStream output = new ByteBufOutputStream(buf);
+
+    // This will later be filled with the correct size of serialized request
+    output.writeInt(0);
+    output.writeByte(request.getType().ordinal());
     try {
-      writableRequest.write(outputStream);
+      request.write(output);
     } catch (IndexOutOfBoundsException e) {
-      LOG.error("encode: Most likely the size of request was not properly " +
-          "specified (this buffer is too small) - see getSerializedSize() in " +
-          writableRequest.getType().getRequestClass());
+      LOG.error("write: Most likely the size of request was not properly " +
+          "specified (this buffer is too small) - see getSerializedSize() " +
+          "in " + request.getType().getRequestClass());
       throw new IllegalStateException(e);
     }
-    outputStream.flush();
-    outputStream.close();
+    output.flush();
+    output.close();
 
     // Set the correct size at the end
-    buf.setInt(0, buf.writerIndex() - 4);
+    buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("encode: Client " + writableRequest.getClientId() + ", " +
-          "requestId " + writableRequest.getRequestId() +
+      LOG.debug("write: Client " + request.getClientId() + ", " +
+          "requestId " + request.getRequestId() +
           ", size = " + buf.readableBytes() + ", " +
-          writableRequest.getType() + " took " +
+          request.getType() + " took " +
           Times.getNanosSince(TIME, startEncodingNanoseconds) + " ns");
     }
-    ctx.writeAndFlush(buf, promise);
+    ctx.write(buf, promise);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 601cd2f..b6d0533 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -80,13 +80,13 @@ public abstract class RequestServerHandler<R> extends
       LOG.trace("messageReceived: Got " + msg.getClass());
     }
 
-    WritableRequest writableRequest = (WritableRequest) msg;
+    WritableRequest request = (WritableRequest) msg;
 
     // Simulate a closed connection on the first request (if desired)
     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
       LOG.info("messageReceived: Simulating closing channel on first " +
-          "request " + writableRequest.getRequestId() + " from " +
-          writableRequest.getClientId());
+          "request " + request.getRequestId() + " from " +
+          request.getClientId());
       setAlreadyClosedFirstRequest();
       ctx.close();
       return;
@@ -95,24 +95,24 @@ public abstract class RequestServerHandler<R> extends
     // Only execute this request exactly once
     int alreadyDone = 1;
     if (workerRequestReservedMap.reserveRequest(
-        writableRequest.getClientId(),
-        writableRequest.getRequestId())) {
+        request.getClientId(),
+        request.getRequestId())) {
       if (LOG.isDebugEnabled()) {
         startProcessingNanoseconds = TIME.getNanoseconds();
       }
-      processRequest((R) writableRequest);
+      processRequest((R) request);
       if (LOG.isDebugEnabled()) {
         LOG.debug("messageReceived: Processing client " +
-            writableRequest.getClientId() + ", " +
-            "requestId " + writableRequest.getRequestId() +
-            ", " +  writableRequest.getType() + " took " +
+            request.getClientId() + ", " +
+            "requestId " + request.getRequestId() +
+            ", " +  request.getType() + " took " +
             Times.getNanosSince(TIME, startProcessingNanoseconds) + " ns");
       }
       alreadyDone = 0;
     } else {
       LOG.info("messageReceived: Request id " +
-          writableRequest.getRequestId() + " from client " +
-          writableRequest.getClientId() +
+          request.getRequestId() + " from client " +
+          request.getClientId() +
           " was already processed, " +
           "not processing again.");
     }
@@ -120,9 +120,10 @@ public abstract class RequestServerHandler<R> extends
     // Send the response with the request id
     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
     buffer.writeInt(myTaskInfo.getTaskId());
-    buffer.writeLong(writableRequest.getRequestId());
+    buffer.writeLong(request.getRequestId());
     buffer.writeByte(alreadyDone);
-    ctx.writeAndFlush(buffer);
+
+    ctx.write(buffer);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
index c0b45fc..4664fbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
@@ -18,15 +18,17 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import io.netty.buffer.ByteBufOutputStream;
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.log4j.Logger;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
 /**
  * How a server should respond to a client. Currently only used for
  * responding to client's SASL messages, and removed after client
@@ -35,8 +37,6 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
   /** Class logger. */
   private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
-  /** Holds the place of the message length until known. */
-  private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg,
@@ -52,28 +52,27 @@ public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
               " WritableRequest.");
     }
     @SuppressWarnings("unchecked")
-    WritableRequest writableRequest =
-        (WritableRequest) msg;
+    WritableRequest writableRequest = (WritableRequest) msg;
+
     ByteBuf buf = ctx.alloc().buffer(10);
-    ByteBufOutputStream outputStream =
-        new ByteBufOutputStream(buf);
+    ByteBufOutputStream output = new ByteBufOutputStream(buf);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("encode: Encoding a message of type " + msg.getClass());
     }
-    outputStream.write(LENGTH_PLACEHOLDER);
 
+    // Space is reserved now to be filled later by the serialize request size
+    output.writeInt(0);
     // write type of object.
-    outputStream.writeByte(writableRequest.getType().ordinal());
-
+    output.writeByte(writableRequest.getType().ordinal());
     // write the object itself.
-    writableRequest.write(outputStream);
+    writableRequest.write(output);
 
-    outputStream.flush();
-    outputStream.close();
+    output.flush();
+    output.close();
 
     // Set the correct size at the end.
-    buf.setInt(0, buf.writerIndex() - 4);
+    buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("encode: Encoding a message of type " + msg.getClass());

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
index 4f80224..ceb5050 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerDataRequest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.requests;
 
-import org.apache.giraph.utils.ByteArrayVertexIdData;
+import org.apache.giraph.utils.VertexIdData;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -33,10 +33,12 @@ import java.io.IOException;
  *
  * @param <I> Vertex id
  * @param <T> Data
- * @param <B> Specialization of {@link ByteArrayVertexIdData} for T
+ * @param <B> Specialization of
+ * {@link org.apache.giraph.utils.VertexIdData} for T
  */
+@SuppressWarnings("unchecked")
 public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
-    B extends ByteArrayVertexIdData<I, T>>
+    B extends VertexIdData<I, T>>
     extends WritableRequest implements WorkerRequest {
   /** Class logger */
   private static final Logger LOG =
@@ -56,8 +58,7 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
   /**
    * Constructor used to send request.
    *
-   * @param partVertData Map of remote partitions =>
-   *                     ByteArrayVertexIdData
+   * @param partVertData Map of remote partitions => VertexIdData
    */
   public SendWorkerDataRequest(
       PairList<Integer, B> partVertData) {
@@ -65,11 +66,13 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
   }
 
   /**
-   * Create a new {@link ByteArrayVertexIdData} specialized for the use case.
+   * Create a new {@link org.apache.giraph.utils.VertexIdData}
+   * specialized for the use case.
    *
-   * @return A new instance of {@link ByteArrayVertexIdData}
+   * @return A new instance of
+   * {@link org.apache.giraph.utils.VertexIdData}
    */
-  public abstract B createByteArrayVertexIdData();
+  public abstract B createVertexIdData();
 
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
@@ -78,7 +81,7 @@ public abstract class SendWorkerDataRequest<I extends WritableComparable, T,
     partitionVertexData.initialize(numPartitions);
     while (numPartitions-- > 0) {
       final int partitionId = input.readInt();
-      B vertexIdData = createByteArrayVertexIdData();
+      B vertexIdData = createVertexIdData();
       vertexIdData.setConf(getConf());
       vertexIdData.readFields(input);
       partitionVertexData.add(partitionId, vertexIdData);

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index 793768a..510743f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -22,6 +22,7 @@ import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -35,7 +36,7 @@ import org.apache.hadoop.io.WritableComparable;
 public class SendWorkerEdgesRequest<I extends WritableComparable,
     E extends Writable>
     extends SendWorkerDataRequest<I, Edge<I, E>,
-    ByteArrayVertexIdEdges<I, E>> {
+    VertexIdEdges<I, E>> {
   /**
    * Constructor used for reflection only
    */
@@ -48,13 +49,13 @@ public class SendWorkerEdgesRequest<I extends WritableComparable,
    *                     ByteArrayVertexIdEdges
    */
   public SendWorkerEdgesRequest(
-      PairList<Integer, ByteArrayVertexIdEdges<I, E>> partVertEdges) {
+      PairList<Integer, VertexIdEdges<I, E>> partVertEdges) {
     this.partitionVertexData = partVertEdges;
   }
 
   @Override
-  public ByteArrayVertexIdEdges<I, E> createByteArrayVertexIdData() {
-    return new ByteArrayVertexIdEdges<I, E>();
+  public VertexIdEdges<I, E> createVertexIdData() {
+    return new ByteArrayVertexIdEdges<>();
   }
 
   @Override
@@ -64,7 +65,7 @@ public class SendWorkerEdgesRequest<I extends WritableComparable,
 
   @Override
   public void doRequest(ServerData serverData) {
-    PairList<Integer, ByteArrayVertexIdEdges<I, E>>.Iterator
+    PairList<Integer, VertexIdEdges<I, E>>.Iterator
         iterator = partitionVertexData.getIterator();
     while (iterator.hasNext()) {
       iterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index 3ac0962..d525164 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
@@ -34,26 +35,26 @@ import java.io.IOException;
  */
 @SuppressWarnings("unchecked")
 public class SendWorkerMessagesRequest<I extends WritableComparable,
-    M extends Writable>
-    extends SendWorkerDataRequest<I, M, ByteArrayVertexIdMessages<I, M>> {
-  /**
-   * Constructor used for reflection only
-   */
-  public SendWorkerMessagesRequest() { }
+    M extends Writable> extends SendWorkerDataRequest<I, M,
+    VertexIdMessages<I, M>> {
+
+  /** Default constructor */
+  public SendWorkerMessagesRequest() {
+  }
 
   /**
    * Constructor used to send request.
    *
    * @param partVertMsgs Map of remote partitions =>
-   *                     ByteArrayVertexIdMessages
+   *                     VertexIdMessages
    */
   public SendWorkerMessagesRequest(
-      PairList<Integer, ByteArrayVertexIdMessages<I, M>> partVertMsgs) {
+      PairList<Integer, VertexIdMessages<I, M>> partVertMsgs) {
     this.partitionVertexData = partVertMsgs;
   }
 
   @Override
-  public ByteArrayVertexIdMessages<I, M> createByteArrayVertexIdData() {
+  public VertexIdMessages<I, M> createVertexIdData() {
     return new ByteArrayVertexIdMessages<I, M>(
         getConf().getOutgoingMessageValueFactory());
   }
@@ -65,7 +66,7 @@ public class SendWorkerMessagesRequest<I extends WritableComparable,
 
   @Override
   public void doRequest(ServerData serverData) {
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>>.Iterator
+    PairList<Integer, VertexIdMessages<I, M>>.Iterator
         iterator = partitionVertexData.getIterator();
     while (iterator.hasNext()) {
       iterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index 181e681..14c8c0d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -21,6 +21,7 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 2862c3e..a32c938 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -847,7 +847,7 @@ public class GiraphConfiguration extends Configuration
     if (nettyBufferAllocator == null) {
       if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
         nettyBufferAllocator = new PooledByteBufAllocator(
-            NETTY_USE_DIRECT_MEMORY.get(this));
+          NETTY_USE_DIRECT_MEMORY.get(this));
       } else { // Use un-pooled allocator
         // Note: Current default settings create un-pooled heap allocator
         nettyBufferAllocator = new UnpooledByteBufAllocator(

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 6b36418..611a0dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -791,7 +791,7 @@ public interface GiraphConstants {
           "Overrides default partition count calculation if not -1");
 
   /** Vertex key space size for
-   * {@link org.apache.giraph.partition.SimpleRangeWorkerPartitioner}
+   * {@link org.apache.giraph.partition.SimpleWorkerPartitioner}
    */
   String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 95e029d..e9f50f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1023,6 +1023,18 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create extendedDataInput based on extendedDataOutput
+   *
+   * @param extendedDataOutput extendedDataOutput
+   * @return extendedDataInput
+   */
+  public ExtendedDataInput createExtendedDataInput(
+    ExtendedDataOutput extendedDataOutput) {
+    return createExtendedDataInput(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  /**
    * Update Computation and MessageCombiner class used
    *
    * @param superstepClasses SuperstepClasses

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 80e909d..ee53718 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -24,9 +24,10 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -146,15 +147,15 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
    * @return out-edges for the vertex
    */
   protected abstract OutEdges<I, E> getVertexOutEdges(
-    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+    VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
     Map<K, OutEdges<I, E>> partitionEdgesIn);
 
   @Override
   public void addPartitionEdges(
-    int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
+    int partitionId, VertexIdEdges<I, E> edges) {
     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
 
-    ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
+    VertexIdEdgeIterator<I, E> vertexIdEdgeIterator =
         edges.getVertexIdEdgeIterator();
     while (vertexIdEdgeIterator.hasNext()) {
       vertexIdEdgeIterator.next();

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 1150eaf..912e25c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.edge;
 
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -38,7 +38,7 @@ public interface EdgeStore<I extends WritableComparable,
    * @param partitionId Partition id for the incoming edges.
    * @param edges Incoming edges
    */
-  void addPartitionEdges(int partitionId, ByteArrayVertexIdEdges<I, E> edges);
+  void addPartitionEdges(int partitionId, VertexIdEdges<I, E> edges);
 
   /**
    * Move all edges from temporary storage to their source vertices.

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
index 6e2a74f..a6a3356 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
@@ -20,7 +20,7 @@ package org.apache.giraph.edge;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
@@ -101,7 +101,7 @@ public class SimpleEdgeStore<I extends WritableComparable,
 
   @Override
   protected OutEdges<I, E> getVertexOutEdges(
-      ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator,
+      VertexIdEdgeIterator<I, E> vertexIdEdgeIterator,
       Map<I, OutEdges<I, E>> partitionEdgesIn) {
     ConcurrentMap<I, OutEdges<I, E>> partitionEdges =
         (ConcurrentMap<I, OutEdges<I, E>>) partitionEdgesIn;

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
index c6b5051..826c685 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
@@ -22,7 +22,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.AbstractEdgeStore;
 import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
@@ -112,8 +112,7 @@ public class IntEdgeStore<V extends Writable, E extends Writable>
 
   @Override
   protected OutEdges<IntWritable, E> getVertexOutEdges(
-      ByteArrayVertexIdEdges<IntWritable, E>.VertexIdEdgeIterator
-          vertexIdEdgeIterator,
+      VertexIdEdgeIterator<IntWritable, E> vertexIdEdgeIterator,
       Map<Integer, OutEdges<IntWritable, E>> partitionEdgesIn) {
     Int2ObjectMap<OutEdges<IntWritable, E>> partitionEdges =
         (Int2ObjectMap<OutEdges<IntWritable, E>>) partitionEdgesIn;

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
index d4c44c7..486410f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
@@ -22,7 +22,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.AbstractEdgeStore;
 import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.VertexIdEdgeIterator;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
@@ -113,8 +113,7 @@ public class LongEdgeStore<V extends Writable, E extends Writable>
 
   @Override
   protected OutEdges<LongWritable, E> getVertexOutEdges(
-    ByteArrayVertexIdEdges<LongWritable, E>.VertexIdEdgeIterator
-      vertexIdEdgeIterator,
+    VertexIdEdgeIterator<LongWritable, E> vertexIdEdgeIterator,
     Map<Long, OutEdges<LongWritable, E>> partitionEdgesIn) {
     Long2ObjectMap<OutEdges<LongWritable, E>> partitionEdges =
         (Long2ObjectMap<OutEdges<LongWritable, E>>) partitionEdgesIn;

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
new file mode 100644
index 0000000..d669be4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/AbstractVertexIdData.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+
+/**
+ * Partial implementation of vertexIdData
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@SuppressWarnings("unchecked")
+public abstract class AbstractVertexIdData<I extends WritableComparable, T>
+  implements VertexIdData<I, T> {
+  /** Extended data output */
+  protected ExtendedDataOutput extendedDataOutput;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
+
+  @Override
+  public void initialize() {
+    extendedDataOutput = getConf().createExtendedDataOutput();
+  }
+
+  @Override
+  public void initialize(int expectedSize) {
+    extendedDataOutput = getConf().createExtendedDataOutput(expectedSize);
+  }
+
+  @Override
+  public void add(I vertexId, T data) {
+    try {
+      vertexId.write(extendedDataOutput);
+      writeData(extendedDataOutput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  @Override
+  public void add(byte[] serializedId, int idPos, T data) {
+    try {
+      extendedDataOutput.write(serializedId, 0, idPos);
+      writeData(extendedDataOutput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  @Override
+  public int getSize() {
+    return extendedDataOutput.getPos();
+  }
+
+
+  @Override
+  public int getSerializedSize() {
+    return SIZE_OF_BYTE + SIZE_OF_INT + getSize();
+  }
+
+
+  @Override
+  public boolean isEmpty() {
+    return extendedDataOutput.getPos() == 0;
+  }
+
+
+  @Override
+  public void clear() {
+    extendedDataOutput.reset();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
+    return configuration;
+  }
+
+  @Override
+  public ByteStructVertexIdDataIterator<I, T> getVertexIdDataIterator() {
+    return new ByteStructVertexIdDataIterator<>(this);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    throw new UnsupportedOperationException("not supported");
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    throw new UnsupportedOperationException("not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
deleted file mode 100644
index d14172e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This iterable is designed to deserialize a byte array on the fly to
- * provide new copies of writable objects when desired.  It does not reuse
- * objects, and instead creates a new one for every next().
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class ByteArrayIterable<T extends Writable> implements
-    Iterable<T> {
-  /** Factory for data input */
-  protected final Factory<? extends ExtendedDataInput> dataInputFactory;
-
-  /**
-   * Constructor
-   *
-   * @param dataInputFactory Factory for data inputs
-   */
-  public ByteArrayIterable(
-      Factory<? extends ExtendedDataInput> dataInputFactory) {
-    this.dataInputFactory = dataInputFactory;
-  }
-
-  /**
-   * Must be able to create the writable object
-   *
-   * @return New writable
-   */
-  protected abstract T createWritable();
-
-  /**
-   * Iterator over the internal byte array
-   */
-  private class ByteArrayIterableIterator extends ByteArrayIterator<T> {
-    /**
-     * Constructor.
-     *
-     * @param dataInputFactory Factory for data input
-     */
-    private ByteArrayIterableIterator(
-        Factory<? extends ExtendedDataInput> dataInputFactory) {
-      super(dataInputFactory.create());
-    }
-
-    @Override
-    protected T createWritable() {
-      return ByteArrayIterable.this.createWritable();
-    }
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-    return new ByteArrayIterableIterator(dataInputFactory);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
deleted file mode 100644
index 28b2dc8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * This iterator is designed to deserialize a byte array on the fly to
- * provide new copies of writable objects when desired.  It does not reuse
- * objects, and instead creates a new one for every next().
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class ByteArrayIterator<T extends Writable> implements
-    Iterator<T> {
-  /** Data input */
-  protected final ExtendedDataInput extendedDataInput;
-
-  /**
-   * Wrap ExtendedDataInput in ByteArrayIterator
-   *
-   * @param extendedDataInput ExtendedDataInput
-   */
-  public ByteArrayIterator(ExtendedDataInput extendedDataInput) {
-    this.extendedDataInput = extendedDataInput;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return extendedDataInput.available() > 0;
-  }
-
-  @Override
-  public T next() {
-    T writable = createWritable();
-    try {
-      writable.readFields(extendedDataInput);
-    } catch (IOException e) {
-      throw new IllegalStateException("next: readFields got IOException", e);
-    }
-    return writable;
-  }
-
-  @Override
-  public void remove() {
-    throw new IllegalAccessError("remove: Not supported");
-  }
-
-  /**
-   * Must be able to create the writable object
-   *
-   * @return New writable
-   */
-  protected abstract T createWritable();
-}


[2/3] GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)

Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index 5c56038..f26a888 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -18,9 +18,6 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.DataInput;
@@ -33,122 +30,8 @@ import java.io.IOException;
  * @param <I> Vertex id
  * @param <T> Data
  */
-public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
-  implements Writable, ImmutableClassesGiraphConfigurable {
-  /** Extended data output */
-  private ExtendedDataOutput extendedDataOutput;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
-
-  /**
-   * Create a new data object.
-   *
-   * @return Newly-created data object.
-   */
-  public abstract T createData();
-
-  /**
-   * Write a data object to an {@link ExtendedDataOutput}.
-   *
-   * @param out {@link ExtendedDataOutput}
-   * @param data Data object to write
-   * @throws IOException
-   */
-  public abstract void writeData(ExtendedDataOutput out, T data)
-    throws IOException;
-
-  /**
-   * Read a data object's fields from an {@link ExtendedDataInput}.
-   *
-   * @param in {@link ExtendedDataInput}
-   * @param data Data object to fill in-place
-   * @throws IOException
-   */
-  public abstract void readData(ExtendedDataInput in, T data)
-    throws IOException;
-
-  /**
-   * Initialize the inner state. Must be called before {@code add()} is
-   * called.
-   */
-  public void initialize() {
-    extendedDataOutput = configuration.createExtendedDataOutput();
-  }
-
-  /**
-   * Initialize the inner state, with a known size. Must be called before
-   * {@code add()} is called.
-   *
-   * @param expectedSize Number of bytes to be expected
-   */
-  public void initialize(int expectedSize) {
-    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
-  }
-
-  /**
-   * Add a vertex id and data pair to the collection.
-   *
-   * @param vertexId Vertex id
-   * @param data Data
-   */
-  public void add(I vertexId, T data) {
-    try {
-      vertexId.write(extendedDataOutput);
-      writeData(extendedDataOutput, data);
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
-  }
-
-  /**
-   * Add a serialized vertex id and data.
-   *
-   * @param serializedId The bye array which holds the serialized id.
-   * @param idPos The end position of the serialized id in the byte array.
-   * @param data Data
-   */
-  public void add(byte[] serializedId, int idPos, T data) {
-    try {
-      extendedDataOutput.write(serializedId, 0, idPos);
-      writeData(extendedDataOutput, data);
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
-  }
-
-  /**
-   * Get the number of bytes used.
-   *
-   * @return Bytes used
-   */
-  public int getSize() {
-    return extendedDataOutput.getPos();
-  }
-
-  /**
-   * Get the size of this object in serialized form.
-   *
-   * @return The size (in bytes) of the serialized object
-   */
-  public int getSerializedSize() {
-    return 1 + 4 + getSize();
-  }
-
-  /**
-   * Check if the list is empty.
-   *
-   * @return Whether the list is empty
-   */
-  public boolean isEmpty() {
-    return extendedDataOutput.getPos() == 0;
-  }
-
-  /**
-   * Clear the list.
-   */
-  public void clear() {
-    extendedDataOutput.reset();
-  }
+public abstract class ByteArrayVertexIdData<I extends WritableComparable,
+    T> extends AbstractVertexIdData<I, T> {
 
   /**
    * Get the underlying byte-array.
@@ -160,16 +43,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   }
 
   @Override
-  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration<I, ?, ?> getConf() {
-    return configuration;
-  }
-
-  @Override
   public void write(DataOutput dataOutput) throws IOException {
     WritableUtils.writeExtendedDataOutput(extendedDataOutput, dataOutput);
   }
@@ -177,73 +50,6 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
   @Override
   public void readFields(DataInput dataInput) throws IOException {
     extendedDataOutput =
-        WritableUtils.readExtendedDataOutput(dataInput, configuration);
-  }
-
-  /**
-   * Get an iterator over the pairs.
-   *
-   * @return Iterator
-   */
-  public VertexIdDataIterator getVertexIdDataIterator() {
-    return new VertexIdDataIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and data objects so that the
-   * lifetime of the object is only until next() is called.
-   *
-   * Vertex id ownership can be released if desired through
-   * releaseCurrentVertexId().  This optimization allows us to cut down
-   * on the number of objects instantiated and garbage collected.
-   *
-   * Not thread-safe.
-   */
-  public class VertexIdDataIterator extends VertexIdIterator<I> {
-    /** Current data. */
-    private T data;
-
-    /** Default constructor. */
-    public VertexIdDataIterator() {
-      super(extendedDataOutput, configuration);
-    }
-
-    @Override
-    public void next() {
-      if (vertexId == null) {
-        vertexId = configuration.createVertexId();
-      }
-      if (data == null) {
-        data = createData();
-      }
-      try {
-        vertexId.readFields(extendedDataInput);
-        readData(extendedDataInput, data);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: IOException", e);
-      }
-    }
-
-    /**
-     * Get the current data.
-     *
-     * @return Current data
-     */
-    public T getCurrentData() {
-      return data;
-    }
-
-    /**
-     * Release the current data object.
-     *
-     * @return Released data object
-     */
-    public T releaseCurrentData() {
-      T releasedData = data;
-      data = null;
-      return releasedData;
-    }
+        WritableUtils.readExtendedDataOutput(dataInput, getConf());
   }
-
 }
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 762802b..16370c7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -33,7 +33,8 @@ import java.io.IOException;
  */
 @SuppressWarnings("unchecked")
 public class ByteArrayVertexIdEdges<I extends WritableComparable,
-    E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> {
+    E extends Writable> extends ByteArrayVertexIdData<I, Edge<I,
+    E>> implements VertexIdEdges<I, E> {
   /**
    * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
    * to generate edge objects.
@@ -62,37 +63,9 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
     WritableUtils.readEdge(in, edge);
   }
 
-  /**
-   * Get an iterator over the pairs.
-   *
-   * @return Iterator
-   */
-  public VertexIdEdgeIterator getVertexIdEdgeIterator() {
-    return new VertexIdEdgeIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and edge objects so that the
-   * lifetime of the object is only until next() is called.
-   */
-  public class VertexIdEdgeIterator extends VertexIdDataIterator {
-    /**
-     * Get the current edge.
-     *
-     * @return Current edge
-     */
-    public Edge<I, E> getCurrentEdge() {
-      return getCurrentData();
-    }
-
-    /**
-     * Release the current edge.
-     *
-     * @return Released edge
-     */
-    public Edge<I, E> releaseCurrentEdge() {
-      return releaseCurrentData();
-    }
+  @Override
+  public ByteStructVertexIdEdgeIterator<I, E> getVertexIdEdgeIterator() {
+    return new ByteStructVertexIdEdgeIterator<>(this);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index 0ac8fdf..b3eca3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.giraph.utils;
 
 import org.apache.giraph.factories.MessageValueFactory;
@@ -33,7 +34,8 @@ import java.io.IOException;
  */
 @SuppressWarnings("unchecked")
 public class ByteArrayVertexIdMessages<I extends WritableComparable,
-    M extends Writable> extends ByteArrayVertexIdData<I, M> {
+  M extends Writable> extends ByteArrayVertexIdData<I, M>
+  implements VertexIdMessages<I, M> {
   /** Message value class */
   private MessageValueFactory<M> messageValueFactory;
   /** Add the message size to the stream? (Depends on the message store) */
@@ -52,7 +54,7 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
   /**
    * Set whether message sizes should be encoded.  This should only be a
    * possibility when not combining.  When combining, all messages need to be
-   * deserializd right away, so this won't help.
+   * de-serialized right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
     if (!getConf().useMessageCombiner()) {
@@ -89,45 +91,31 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
     setUseMessageSizeEncoding();
   }
 
-  /**
-   * Get specialized iterator that will instiantiate the vertex id and
-   * message of this object.
-   *
-   * @return Special iterator that reuses vertex ids and messages unless
-   *         specified
-   */
-  public VertexIdMessageIterator getVertexIdMessageIterator() {
-    return new VertexIdMessageIterator();
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and message objects so that the
-   * lifetime of the object is only until next() is called.
-   */
-  public class VertexIdMessageIterator extends VertexIdDataIterator {
-    /**
-     * Get the current message.
-     *
-     * @return Current message
-     */
-    public M getCurrentMessage() {
-      return getCurrentData();
-    }
+  @Override
+  public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+    return new ByteStructVertexIdMessageIterator<>(this);
   }
 
-  /**
-   * Get specialized iterator that will instiantiate the vertex id and
-   * message of this object.  It will only produce message bytes, not actual
-   * messages and expects a different encoding.
-   *
-   * @return Special iterator that reuses vertex ids (unless released) and
-   *         copies message bytes
-   */
-  public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+  @Override
+  public ByteStructVertexIdMessageBytesIterator<I, M>
+  getVertexIdMessageBytesIterator() {
     if (!useMessageSizeEncoding) {
       return null;
     }
-    return new VertexIdMessageBytesIterator();
+    return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
+      @Override
+      public void writeCurrentMessageBytes(DataOutput dataOutput) {
+        try {
+          dataOutput.write(extendedDataOutput.getByteArray(),
+            messageOffset, messageBytes);
+        } catch (NegativeArraySizeException e) {
+          VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
+        } catch (IOException e) {
+          throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+              "IOException", e);
+        }
+      }
+    };
   }
 
   @Override
@@ -141,66 +129,4 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
     useMessageSizeEncoding = dataInput.readBoolean();
     super.readFields(dataInput);
   }
-
-  /**
-   * Special iterator that reuses vertex ids and messages bytes so that the
-   * lifetime of the object is only until next() is called.
-   *
-   * Vertex id ownership can be released if desired through
-   * releaseCurrentVertexId().  This optimization allows us to cut down
-   * on the number of objects instantiated and garbage collected.  Messages
-   * can only be copied to an ExtendedDataOutput object
-   *
-   * Not thread-safe.
-   */
-  public class VertexIdMessageBytesIterator extends VertexIdDataIterator {
-    /** Last message offset */
-    private int messageOffset = -1;
-    /** Number of bytes in the last message */
-    private int messageBytes = -1;
-
-    /**
-     * Moves to the next element in the iteration.
-     */
-    @Override
-    public void next() {
-      if (vertexId == null) {
-        vertexId = getConf().createVertexId();
-      }
-
-      try {
-        vertexId.readFields(extendedDataInput);
-        messageBytes = extendedDataInput.readInt();
-        messageOffset = extendedDataInput.getPos();
-        if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
-          throw new IllegalStateException("next: Failed to skip " +
-              messageBytes);
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException("next: IOException", e);
-      }
-    }
-
-    /**
-     * Write the current message to an ExtendedDataOutput object
-     *
-     * @param dataOutput Where the current message will be written to
-     */
-    public void writeCurrentMessageBytes(DataOutput dataOutput) {
-      try {
-        dataOutput.write(getByteArray(), messageOffset, messageBytes);
-      } catch (NegativeArraySizeException e) {
-        throw new RuntimeException("The numbers of bytes sent to vertex " +
-            vertexId + " exceeded the max capacity of " +
-            "its ExtendedDataOutput. Please consider setting " +
-            "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
-            " in the graph which receive a lot of messages (total serialized " +
-            "size of messages goes beyond the maximum size of a byte array), " +
-            "setting this option to true will remove that limit");
-      } catch (IOException e) {
-        throw new IllegalStateException("writeCurrentMessageBytes: Got " +
-            "IOException", e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
new file mode 100644
index 0000000..4e15c1b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterable is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterable<T extends Writable> implements
+    Iterable<T> {
+  /** Factory for data input */
+  protected final Factory<? extends ExtendedDataInput> dataInputFactory;
+
+  /**
+   * Constructor
+   *
+   * @param dataInputFactory Factory for data inputs
+   */
+  public ByteStructIterable(
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    this.dataInputFactory = dataInputFactory;
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+
+  @Override
+  public Iterator<T> iterator() {
+    return new ByteStructIterator<T>(dataInputFactory.create()) {
+      @Override
+      protected T createWritable() {
+        return ByteStructIterable.this.createWritable();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
new file mode 100644
index 0000000..322365c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterator is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteStructIterator<T extends Writable> implements
+    Iterator<T> {
+  /** Data input */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /**
+   * Wrap ExtendedDataInput in ByteArrayIterator
+   *
+   * @param extendedDataInput ExtendedDataInput
+   */
+  public ByteStructIterator(ExtendedDataInput extendedDataInput) {
+    this.extendedDataInput = extendedDataInput;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  @Override
+  public T next() {
+    T writable = createWritable();
+    try {
+      writable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return writable;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("remove: Not supported");
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
new file mode 100644
index 0000000..cefec0e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+@NotThreadSafe
+public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
+  extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
+  /** VertexIdData to iterate over */
+  protected AbstractVertexIdData<I, T> vertexIdData;
+  /** Current data. */
+  private T data;
+
+  /**
+   * Constructor
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdDataIterator(
+    AbstractVertexIdData<I, T> vertexIdData) {
+    super(vertexIdData.extendedDataOutput, vertexIdData.getConf());
+    this.vertexIdData = vertexIdData;
+  }
+
+  @Override
+  public void next() {
+    if (vertexId == null) {
+      vertexId = vertexIdData.getConf().createVertexId();
+    }
+    if (data == null) {
+      data = vertexIdData.createData();
+    }
+    try {
+      vertexId.readFields(extendedDataInput);
+      vertexIdData.readData(extendedDataInput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+
+  @Override
+  public T getCurrentData() {
+    return data;
+  }
+
+  @Override
+  public T releaseCurrentData() {
+    T releasedData = data;
+    data = null;
+    return releasedData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
new file mode 100644
index 0000000..7e06038
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdEdgeIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public class ByteStructVertexIdEdgeIterator<I extends WritableComparable,
+  E extends Writable> extends ByteStructVertexIdDataIterator<I, Edge<I, E>>
+  implements VertexIdEdgeIterator<I, E> {
+
+  /**
+   * Constructor
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdEdgeIterator(
+      AbstractVertexIdData<I, Edge<I, E>> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  @Override
+  public Edge<I, E> getCurrentEdge() {
+    return getCurrentData();
+  }
+
+
+  @Override
+  public Edge<I, E> releaseCurrentEdge() {
+    return releaseCurrentData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
new file mode 100644
index 0000000..3d564cd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator
+ * and VertexIdMessageBytesIterator.
+ *
+ * @param <I> Vertex id
+ */
+public abstract class ByteStructVertexIdIterator<I extends WritableComparable>
+  implements VertexIdIterator<I> {
+  /** Reader of the serialized edges */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /** Current vertex id */
+  protected I vertexId;
+
+  /**
+   * Constructor.
+   *
+   * @param extendedDataOutput Extended data output
+   * @param conf Configuration
+   */
+  public ByteStructVertexIdIterator(
+    ExtendedDataOutput extendedDataOutput,
+    ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
+    if (extendedDataOutput != null && conf != null) {
+      extendedDataInput = conf.createExtendedDataInput(extendedDataOutput);
+    } else {
+      throw new IllegalStateException("Cannot instantiate vertexIdIterator " +
+        "with null arguments");
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  @Override
+  public I getCurrentVertexId() {
+    return vertexId;
+  }
+
+  @Override
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertexId;
+    vertexId = null;
+    return releasedVertexId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..c0a86c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageBytesIterator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.  Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+@NotThreadSafe
+public abstract class ByteStructVertexIdMessageBytesIterator<I
+  extends WritableComparable, M extends Writable>
+  extends ByteStructVertexIdDataIterator<I, M>
+  implements VertexIdMessageBytesIterator<I, M> {
+  /** Last message offset */
+  protected int messageOffset = -1;
+  /** Number of bytes in the last message */
+  protected int messageBytes = -1;
+
+  /**
+   * Constructor with vertexIdData
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdMessageBytesIterator(
+    AbstractVertexIdData<I, M> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  /**
+   * Moves to the next element in the iteration.
+   */
+  @Override
+  public void next() {
+    if (vertexId == null) {
+      vertexId = vertexIdData.getConf().createVertexId();
+    }
+
+    try {
+      vertexId.readFields(extendedDataInput);
+      messageBytes = extendedDataInput.readInt();
+      messageOffset = extendedDataInput.getPos();
+      if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+        throw new IllegalStateException("next: Failed to skip " +
+            messageBytes);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
new file mode 100644
index 0000000..b686211
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
+  M extends Writable> extends ByteStructVertexIdDataIterator<I, M>
+  implements VertexIdMessageIterator<I, M> {
+
+  /**
+   * Constructor with vertexIdData
+   *
+   * @param vertexIdData vertexIdData
+   */
+  public ByteStructVertexIdMessageIterator(
+    AbstractVertexIdData<I, M> vertexIdData) {
+    super(vertexIdData);
+  }
+
+  @Override
+  public M getCurrentMessage() {
+    return getCurrentData();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
new file mode 100644
index 0000000..52d959e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+  /** Bytes used in a boolean */
+  public static final int SIZE_OF_BOOLEAN = 1;
+  /** Bytes used in a byte */
+  public static final int SIZE_OF_BYTE = 1;
+  /** Bytes used in a char */
+  public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+  /** Bytes used in a short */
+  public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+  /** Bytes used in an int */
+  public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+  /** Bytes used in a long */
+  public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+  /** Bytes used in a float */
+  public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+  /** Bytes used in a double */
+  public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+
+  /**
+   * Private Constructor
+   */
+  private ByteUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
index 0ecea77..3eae25b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -26,7 +26,7 @@ import java.io.IOException;
  * Provides access to a internals of ByteArrayInputStream
  */
 public class ExtendedByteArrayDataInput extends ByteArrayInputStream
-    implements ExtendedDataInput {
+  implements ExtendedDataInput {
   /** Internal data input */
   private final DataInput dataInput;
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 0ff366d..9988b15 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -140,10 +140,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
   }
 
   @Override
-  public void skipBytes(int bytesToSkip) {
-    if ((count + bytesToSkip) > buf.length) {
-      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+  public void ensureWritable(int minSize) {
+    if ((count + minSize) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + minSize));
     }
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    ensureWritable(bytesToSkip);
     count += bytesToSkip;
   }
 
@@ -161,6 +166,15 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
   }
 
   @Override
+  public byte[] toByteArray(int offset, int length) {
+    if (offset + length > count) {
+      throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+          "Length: %d exceeds the size of buf : %d", offset, length, count));
+    }
+    return Arrays.copyOfRange(buf, offset, length);
+  }
+
+  @Override
   public byte[] getByteArray() {
     return buf;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index 54ef514..bc979af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -24,6 +24,14 @@ import java.io.DataOutput;
  */
 public interface ExtendedDataOutput extends DataOutput {
   /**
+   * Ensure that backing byte structure has at least minSize
+   * additional bytes
+   *
+   * @param minSize additional size required
+   */
+  void ensureWritable(int minSize);
+
+  /**
    * Skip some number of bytes.
    *
    * @param  bytesToSkip Number of bytes to skip
@@ -61,7 +69,17 @@ public interface ExtendedDataOutput extends DataOutput {
   byte[] toByteArray();
 
   /**
+   * Return a copy of slice of byte array
+   *
+   * @param offset offset of array
+   * @param length length of slice
+   * @return byte array
+   */
+  byte[] toByteArray(int offset, int length);
+
+  /**
    * Clears the buffer
    */
   void reset();
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
deleted file mode 100644
index 2c24e89..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by the iterators generated from this object have
- * lifetimes only until next() is called.  In that sense, the object
- * provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterable<T extends Writable>
-    extends ByteArrayIterable<T> {
-  /**
-   * Constructor
-   *
-   * @param dataInputFactory Factory for data inputs
-   */
-  public RepresentativeByteArrayIterable(
-      Factory<? extends ExtendedDataInput> dataInputFactory) {
-    super(dataInputFactory);
-  }
-
-  /**
-   * Iterator over the internal byte array
-   */
-  private class RepresentativeByteArrayIterableIterator extends
-      RepresentativeByteArrayIterator<T> {
-    /**
-     * Constructor.
-     *
-     * @param extendedDataInput ExtendedDataInput
-     */
-    private RepresentativeByteArrayIterableIterator(
-        ExtendedDataInput extendedDataInput) {
-      super(extendedDataInput);
-    }
-
-    @Override
-    protected T createWritable() {
-      return RepresentativeByteArrayIterable.this.createWritable();
-    }
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-    return
-        new RepresentativeByteArrayIterableIterator(dataInputFactory.create());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
deleted file mode 100644
index d36c94f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-
-/**
- * The objects provided by this iterator have lifetimes only until next() is
- * called.  In that sense, the object provided is only a representative object.
- *
- * @param <T> Type that extends Writable that will be iterated
- */
-public abstract class RepresentativeByteArrayIterator<T extends
-    Writable> extends ByteArrayIterator<T> {
-  /** Representative writable */
-  private final T representativeWritable = createWritable();
-
-  /**
-   * Wrap ExtendedDataInput in ByteArrayIterator
-   *
-   * @param extendedDataInput ExtendedDataInput
-   */
-  public RepresentativeByteArrayIterator(ExtendedDataInput extendedDataInput) {
-    super(extendedDataInput);
-  }
-
-  @Override
-  public T next() {
-    try {
-      representativeWritable.readFields(extendedDataInput);
-    } catch (IOException e) {
-      throw new IllegalStateException("next: readFields got IOException", e);
-    }
-    return representativeWritable;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
new file mode 100644
index 0000000..0859010
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by the iterators generated from this object have
+ * lifetimes only until next() is called.  In that sense, the object
+ * provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterable<T extends Writable>
+    extends ByteStructIterable<T> {
+  /**
+   * Constructor
+   *
+   * @param dataInputFactory Factory for data inputs
+   */
+  public RepresentativeByteStructIterable(
+      Factory<? extends ExtendedDataInput> dataInputFactory) {
+    super(dataInputFactory);
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new RepresentativeByteStructIterator<T>(dataInputFactory.create()) {
+      @Override
+      protected T createWritable() {
+        return RepresentativeByteStructIterable.this.createWritable();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
new file mode 100644
index 0000000..0bf98ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RepresentativeByteStructIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by this iterator have lifetimes only until next() is
+ * called.  In that sense, the object provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteStructIterator<T extends
+    Writable> extends ByteStructIterator<T> {
+  /** Representative writable */
+  private final T representativeWritable = createWritable();
+
+  /**
+   * Wrap ExtendedDataInput in ByteArrayIterator
+   *
+   * @param extendedDataInput ExtendedDataInput
+   */
+  public RepresentativeByteStructIterator(ExtendedDataInput extendedDataInput) {
+    super(extendedDataInput);
+  }
+
+  @Override
+  public T next() {
+    try {
+      representativeWritable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return representativeWritable;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
new file mode 100644
index 0000000..2c56cb8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/RequestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.giraph.comm.requests.WritableRequest;
+
+import java.io.IOException;
+import org.apache.log4j.Logger;
+
+/**
+ * RequestUtils utility class
+ */
+public class RequestUtils {
+  /** Logger */
+  public static final Logger LOG = Logger.getLogger(RequestUtils.class);
+
+  /**
+   * Private Constructor
+   */
+  private RequestUtils() {
+  }
+
+  /**
+   * decodeWritableRequest based on predicate
+   *
+   * @param buf ByteBuf
+   * @param request writableRequest
+   * @return properly initialized writableRequest
+   * @throws IOException
+   */
+  public static WritableRequest decodeWritableRequest(ByteBuf buf,
+    WritableRequest request) throws IOException {
+    ByteBufInputStream input = new ByteBufInputStream(buf);
+    request.readFields(input);
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
new file mode 100644
index 0000000..db19fda
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public class UnsafeArrayReads extends UnsafeReads {
+  /** Access to the unsafe class */
+  private static final sun.misc.Unsafe UNSAFE;
+  static {
+    try {
+      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      field.setAccessible(true);
+      UNSAFE = (sun.misc.Unsafe) field.get(null);
+      // Checkstyle exception due to needing to check if unsafe is allowed
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      throw new RuntimeException("UnsafeArrayReads: Failed to " +
+          "get unsafe", e);
+    }
+  }
+  /** Offset of a byte array */
+  private static final long BYTE_ARRAY_OFFSET  =
+      UNSAFE.arrayBaseOffset(byte[].class);
+
+  /** Byte buffer */
+  private final byte[] buf;
+
+  /**
+   * Constructor
+   *
+   * @param buf Buffer to read from
+   */
+  public UnsafeArrayReads(byte[] buf) {
+    super(buf.length);
+    this.buf = buf;
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param buf Buffer to read from
+   * @param offset Offsetin the buffer to start reading from
+   * @param length Max length of the buffer to read
+   */
+  public UnsafeArrayReads(byte[] buf, int offset, int length) {
+    super(offset, length);
+    this.buf = buf;
+  }
+
+  @Override
+  public int available() {
+    return (int) (bufLength - pos);
+  }
+
+
+  @Override
+  public int getPos() {
+    return (int) pos;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    ensureRemaining(b.length);
+    System.arraycopy(buf, (int) pos, b, 0, b.length);
+    pos += b.length;
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    ensureRemaining(len);
+    System.arraycopy(buf, (int) pos, b, off, len);
+    pos += len;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    ensureRemaining(SIZE_OF_BOOLEAN);
+    boolean value = UNSAFE.getBoolean(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_BOOLEAN;
+    return value;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureRemaining(SIZE_OF_BYTE);
+    byte value = UNSAFE.getByte(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_BYTE;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return (short) (readByte() & 0xFF);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    ensureRemaining(SIZE_OF_SHORT);
+    short value = UNSAFE.getShort(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_SHORT;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return readShort() & 0xFFFF;
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    ensureRemaining(SIZE_OF_CHAR);
+    char value = UNSAFE.getChar(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_CHAR;
+    return value;
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    ensureRemaining(SIZE_OF_INT);
+    int value = UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_INT;
+    return value;
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    ensureRemaining(SIZE_OF_LONG);
+    long value = UNSAFE.getLong(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_LONG;
+    return value;
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    ensureRemaining(SIZE_OF_FLOAT);
+    float value = UNSAFE.getFloat(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_FLOAT;
+    return value;
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    ensureRemaining(SIZE_OF_DOUBLE);
+    double value = UNSAFE.getDouble(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += SIZE_OF_DOUBLE;
+    return value;
+  }
+
+  /**
+   * Get an int at an arbitrary position in a byte[]
+   *
+   * @param buf Buffer to get the int from
+   * @param pos Position in the buffer to get the int from
+   * @return Int at the buffer position
+   */
+  public static int getInt(byte[] buf, int pos) {
+    return UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
index 20ed92b..c8a8cac 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
@@ -15,52 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.giraph.utils;
 
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.lang.reflect.Field;
-import org.apache.log4j.Logger;
+package org.apache.giraph.utils;
 
 /**
- * Byte array output stream that uses Unsafe methods to serialize/deserialize
- * much faster
+ * UnsafeByteArrayInputStream
  */
-public class UnsafeByteArrayInputStream implements ExtendedDataInput {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(
-      UnsafeByteArrayInputStream.class);
-  /** Access to the unsafe class */
-  private static final sun.misc.Unsafe UNSAFE;
-  static {
-    try {
-      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
-      field.setAccessible(true);
-      UNSAFE = (sun.misc.Unsafe) field.get(null);
-      // Checkstyle exception due to needing to check if unsafe is allowed
-      // CHECKSTYLE: stop IllegalCatch
-    } catch (Exception e) {
-      // CHECKSTYLE: resume IllegalCatch
-      throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
-          "get unsafe", e);
-    }
-  }
-  /** Offset of a byte array */
-  private static final long BYTE_ARRAY_OFFSET  =
-      UNSAFE.arrayBaseOffset(byte[].class);
-  /** Offset of a long array */
-  private static final long LONG_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(long[].class);
-  /** Offset of a double array */
-  private static final long DOUBLE_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(double[].class);
-
-  /** Byte buffer */
-  private final byte[] buf;
-  /** Buffer length */
-  private final int bufLength;
-  /** Position in the buffer */
-  private int pos = 0;
+public class UnsafeByteArrayInputStream extends UnsafeArrayReads {
 
   /**
    * Constructor
@@ -68,8 +29,7 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
    * @param buf Buffer to read from
    */
   public UnsafeByteArrayInputStream(byte[] buf) {
-    this.buf = buf;
-    this.bufLength = buf.length;
+    super(buf);
   }
 
   /**
@@ -80,275 +40,6 @@ public class UnsafeByteArrayInputStream implements ExtendedDataInput {
    * @param length Max length of the buffer to read
    */
   public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
-    this.buf = buf;
-    this.pos = offset;
-    this.bufLength = length;
-  }
-
-  /**
-   * How many bytes are still available?
-   *
-   * @return Number of bytes available
-   */
-  public int available() {
-    return bufLength - pos;
-  }
-
-  /**
-   * What position in the stream?
-   *
-   * @return Position
-   */
-  public int getPos() {
-    return pos;
-  }
-
-  /**
-   * Check whether there are enough remaining bytes for an operation
-   *
-   * @param requiredBytes Bytes required to read
-   * @throws IOException When there are not enough bytes to read
-   */
-  private void ensureRemaining(int requiredBytes) throws IOException {
-    if (bufLength - pos < requiredBytes) {
-      throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
-          " bytes remaining, trying to read " + requiredBytes);
-    }
-  }
-
-  @Override
-  public void readFully(byte[] b) throws IOException {
-    ensureRemaining(b.length);
-    System.arraycopy(buf, pos, b, 0, b.length);
-    pos += b.length;
-  }
-
-  @Override
-  public void readFully(byte[] b, int off, int len) throws IOException {
-    ensureRemaining(len);
-    System.arraycopy(buf, pos, b, off, len);
-    pos += len;
-  }
-
-  @Override
-  public int skipBytes(int n) throws IOException {
-    ensureRemaining(n);
-    pos += n;
-    return n;
-  }
-
-  @Override
-  public boolean readBoolean() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
-    boolean value = UNSAFE.getBoolean(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
-    return value;
-  }
-
-  @Override
-  public byte readByte() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
-    byte value = UNSAFE.getByte(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
-    return value;
-  }
-
-  @Override
-  public int readUnsignedByte() throws IOException {
-    return (short) (readByte() & 0xFF);
-  }
-
-  @Override
-  public short readShort() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
-    short value = UNSAFE.getShort(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
-    return value;
-  }
-
-  @Override
-  public int readUnsignedShort() throws IOException {
-    return readShort() & 0xFFFF;
-  }
-
-  @Override
-  public char readChar() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
-    char value = UNSAFE.getChar(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
-    return value;
-  }
-
-  @Override
-  public int readInt() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
-    int value = UNSAFE.getInt(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
-    return value;
-  }
-
-  @Override
-  public long readLong() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
-    long value = UNSAFE.getLong(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
-    return value;
-  }
-
-  @Override
-  public float readFloat() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
-    float value = UNSAFE.getFloat(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
-    return value;
-  }
-
-  @Override
-  public double readDouble() throws IOException {
-    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
-    double value = UNSAFE.getDouble(buf,
-        BYTE_ARRAY_OFFSET + pos);
-    pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
-    return value;
-  }
-
-  @Override
-  public String readLine() throws IOException {
-    // Note that this code is mostly copied from DataInputStream
-    char[] tmpBuf = new char[128];
-
-    int room = tmpBuf.length;
-    int offset = 0;
-    int c;
-
-  loop:
-    while (true) {
-      c = readByte();
-      switch (c) {
-      case -1:
-      case '\n':
-        break loop;
-      case '\r':
-        int c2 = readByte();
-        if ((c2 != '\n') && (c2 != -1)) {
-          pos -= 1;
-        }
-        break loop;
-      default:
-        if (--room < 0) {
-          char[] replacebuf = new char[offset + 128];
-          room = replacebuf.length - offset - 1;
-          System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
-          tmpBuf = replacebuf;
-        }
-        tmpBuf[offset++] = (char) c;
-        break;
-      }
-    }
-    if ((c == -1) && (offset == 0)) {
-      return null;
-    }
-    return String.copyValueOf(tmpBuf, 0, offset);
-  }
-
-  @Override
-  public String readUTF() throws IOException {
-    // Note that this code is mostly copied from DataInputStream
-    int utflen = readUnsignedShort();
-
-    byte[] bytearr = new byte[utflen];
-    char[] chararr = new char[utflen];
-
-    int c;
-    int char2;
-    int char3;
-    int count = 0;
-    int chararrCount = 0;
-
-    readFully(bytearr, 0, utflen);
-
-    while (count < utflen) {
-      c = (int) bytearr[count] & 0xff;
-      if (c > 127) {
-        break;
-      }
-      count++;
-      chararr[chararrCount++] = (char) c;
-    }
-
-    while (count < utflen) {
-      c = (int) bytearr[count] & 0xff;
-      switch (c >> 4) {
-      case 0:
-      case 1:
-      case 2:
-      case 3:
-      case 4:
-      case 5:
-      case 6:
-      case 7:
-        /* 0xxxxxxx */
-        count++;
-        chararr[chararrCount++] = (char) c;
-        break;
-      case 12:
-      case 13:
-        /* 110x xxxx   10xx xxxx*/
-        count += 2;
-        if (count > utflen) {
-          throw new UTFDataFormatException(
-              "malformed input: partial character at end");
-        }
-        char2 = (int) bytearr[count - 1];
-        if ((char2 & 0xC0) != 0x80) {
-          throw new UTFDataFormatException(
-              "malformed input around byte " + count);
-        }
-        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
-            (char2 & 0x3F));
-        break;
-      case 14:
-        /* 1110 xxxx  10xx xxxx  10xx xxxx */
-        count += 3;
-        if (count > utflen) {
-          throw new UTFDataFormatException(
-              "malformed input: partial character at end");
-        }
-        char2 = (int) bytearr[count - 2];
-        char3 = (int) bytearr[count - 1];
-        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-          throw new UTFDataFormatException(
-              "malformed input around byte " + (count - 1));
-        }
-        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
-            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
-        break;
-      default:
-        /* 10xx xxxx,  1111 xxxx */
-        throw new UTFDataFormatException(
-            "malformed input around byte " + count);
-      }
-    }
-    // The number of chars produced may be less than utflen
-    return new String(chararr, 0, chararrCount);
-  }
-
-  /**
-   * Get an int at an arbitrary position in a byte[]
-   *
-   * @param buf Buffer to get the int from
-   * @param pos Position in the buffer to get the int from
-   * @return Int at the buffer position
-   */
-  public static int getInt(byte[] buf, int pos) {
-    return UNSAFE.getInt(buf,
-        BYTE_ARRAY_OFFSET + pos);
+    super(buf, offset, length);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index 4b413da..8736590 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -22,12 +22,21 @@ import java.io.OutputStream;
 import java.lang.reflect.Field;
 import java.util.Arrays;
 
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BOOLEAN;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_BYTE;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_CHAR;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_SHORT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_INT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_LONG;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_FLOAT;
+import static org.apache.giraph.utils.ByteUtils.SIZE_OF_DOUBLE;
+
 /**
  * Byte array output stream that uses Unsafe methods to serialize/deserialize
  * much faster
  */
 public class UnsafeByteArrayOutputStream extends OutputStream
-    implements ExtendedDataOutput {
+  implements ExtendedDataOutput {
   static {
     try {
       Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
@@ -42,24 +51,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
     }
   }
 
-  /** Bytes used in a boolean */
-  public static final int SIZE_OF_BOOLEAN = 1;
-  /** Bytes used in a byte */
-  public static final int SIZE_OF_BYTE = 1;
-  /** Bytes used in a char */
-  public static final int SIZE_OF_CHAR = 2;
-  /** Bytes used in a short */
-  public static final int SIZE_OF_SHORT = 2;
-  /** Bytes used in a medium */
-  public static final int SIZE_OF_MEDIUM = 3;
-  /** Bytes used in an int */
-  public static final int SIZE_OF_INT = 4;
-  /** Bytes used in a float */
-  public static final int SIZE_OF_FLOAT = 4;
-  /** Bytes used in a long */
-  public static final int SIZE_OF_LONG = 8;
-  /** Bytes used in a double */
-  public static final int SIZE_OF_DOUBLE = 8;
   /** Default number of bytes */
   private static final int DEFAULT_BYTES = 32;
   /** Access to the unsafe class */
@@ -68,12 +59,6 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   /** Offset of a byte array */
   private static final long BYTE_ARRAY_OFFSET  =
       UNSAFE.arrayBaseOffset(byte[].class);
-  /** Offset of a long array */
-  private static final long LONG_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(long[].class);
-  /** Offset of a double array */
-  private static final long DOUBLE_ARRAY_OFFSET =
-      UNSAFE.arrayBaseOffset(double[].class);
 
   /** Byte buffer */
   private byte[] buf;
@@ -142,7 +127,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   @Override
   public byte[] toByteArray() {
     return Arrays.copyOf(buf, pos);
+  }
 
+  @Override
+  public byte[] toByteArray(int offset, int length) {
+    if (offset + length > pos) {
+      throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+          "Length: %d exceeds the size of buf : %d", offset, length, pos));
+    }
+    return Arrays.copyOfRange(buf, offset, length);
   }
 
   @Override
@@ -212,10 +205,15 @@ public class UnsafeByteArrayOutputStream extends OutputStream
   }
 
   @Override
-  public void skipBytes(int bytesToSkip) {
-    if ((pos + bytesToSkip) > buf.length) {
-      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip));
+  public void ensureWritable(int minSize) {
+    if ((pos + minSize) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
     }
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    ensureWritable(bytesToSkip);
     pos += bytesToSkip;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
new file mode 100644
index 0000000..5f99846
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+
+/**
+ * Byte array input stream that uses Unsafe methods to deserialize
+ * much faster
+ */
+public abstract class UnsafeReads implements ExtendedDataInput {
+
+  /** Buffer length */
+  protected final int bufLength;
+  /** Position in the buffer */
+  protected long pos = 0;
+
+  /**
+   * Constructor
+   *
+   * @param length buf length
+   */
+  public UnsafeReads(int length) {
+    bufLength = length;
+  }
+
+  /**
+   * Constructor with offset
+   *
+   * @param offset offset in memory
+   * @param length buf length
+   */
+  public UnsafeReads(long offset, int length) {
+    pos = offset;
+    bufLength = length;
+  }
+
+  /**
+   * How many bytes are still available?
+   *
+   * @return Number of bytes available
+   */
+  public abstract int available();
+
+  /**
+   * What position in the stream?
+   *
+   * @return Position
+   */
+  public abstract int getPos();
+
+  /**
+   * Check whether there are enough remaining bytes for an operation
+   *
+   * @param requiredBytes Bytes required to read
+   * @throws IOException When there are not enough bytes to read
+   */
+  protected void ensureRemaining(int requiredBytes) throws IOException {
+    if (available() < requiredBytes) {
+      throw new IOException("ensureRemaining: Only " + available() +
+          " bytes remaining, trying to read " + requiredBytes);
+    }
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    ensureRemaining(n);
+    pos += n;
+    return n;
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    char[] tmpBuf = new char[128];
+
+    int room = tmpBuf.length;
+    int offset = 0;
+    int c;
+
+  loop:
+    while (true) {
+      c = readByte();
+      switch (c) {
+      case -1:
+      case '\n':
+        break loop;
+      case '\r':
+        int c2 = readByte();
+        if ((c2 != '\n') && (c2 != -1)) {
+          pos -= 1;
+        }
+        break loop;
+      default:
+        if (--room < 0) {
+          char[] replacebuf = new char[offset + 128];
+          room = replacebuf.length - offset - 1;
+          System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+          tmpBuf = replacebuf;
+        }
+        tmpBuf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(tmpBuf, 0, offset);
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    int utflen = readUnsignedShort();
+
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int c;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+      /* 0xxxxxxx */
+        count++;
+        chararr[chararrCount++] = (char) c;
+        break;
+      case 12:
+      case 13:
+      /* 110x xxxx   10xx xxxx*/
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + count);
+        }
+        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+            (char2 & 0x3F));
+        break;
+      case 14:
+      /* 1110 xxxx  10xx xxxx  10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 2];
+        char3 = (int) bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + (count - 1));
+        }
+        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+      /* 10xx xxxx,  1111 xxxx */
+        throw new UTFDataFormatException(
+            "malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
deleted file mode 100644
index 8673732..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteArrayMessageWrite.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-/** Verbose Error mesage for ByteArray based messages */
-public class VerboseByteArrayMessageWrite {
-  /** Do not construct */
-  protected VerboseByteArrayMessageWrite() {
-  }
-
-  /**
-   * verboseWriteCurrentMessage
-   * de-serialize, then write messages
-   *
-   * @param iterator iterator
-   * @param out DataOutput
-   * @param <I> vertexId
-   * @param <M> message
-   * @throws IOException
-   * @throws RuntimeException
-   */
-  public static <I extends WritableComparable, M extends Writable> void
-  verboseWriteCurrentMessage(
-    ByteArrayVertexIdMessages<I, M>.VertexIdMessageIterator
-    iterator, DataOutput out) throws IOException {
-    try {
-      iterator.getCurrentMessage().write(out);
-    } catch (NegativeArraySizeException e) {
-      throw new RuntimeException("The numbers of bytes sent to vertex " +
-          iterator.getCurrentVertexId() + " exceeded the max capacity of " +
-          "its ExtendedDataOutput. Please consider setting " +
-          "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
-          " in the graph which receive a lot of messages (total serialized " +
-          "size of messages goes beyond the maximum size of a byte array), " +
-          "setting this option to true will remove that limit");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
new file mode 100644
index 0000000..aa25490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/** Verbose Error mesage for ByteArray based messages */
+public class VerboseByteStructMessageWrite {
+  /**
+   * Private Constructor
+   */
+  private VerboseByteStructMessageWrite() {
+  }
+
+  /**
+   * verboseWriteCurrentMessage
+   * de-serialize, then write messages
+   *
+   * @param iterator iterator
+   * @param out DataOutput
+   * @param <I> vertexId
+   * @param <M> message
+   * @throws IOException
+   * @throws RuntimeException
+   */
+  public static <I extends WritableComparable, M extends Writable> void
+  verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
+    DataOutput out) throws IOException {
+    try {
+      iterator.getCurrentMessage().write(out);
+    } catch (NegativeArraySizeException e) {
+      handleNegativeArraySize(iterator.getCurrentVertexId());
+    }
+  }
+
+  /**
+   * message to present on NegativeArraySizeException
+   *
+   * @param vertexId vertexId
+   * @param <I> vertexId type
+   */
+  public static <I extends WritableComparable> void handleNegativeArraySize(
+      I vertexId) {
+    throw new RuntimeException("The numbers of bytes sent to vertex " +
+        vertexId + " exceeded the max capacity of " +
+        "its ExtendedDataOutput. Please consider setting " +
+        "giraph.useBigDataIOForMessages=true. If there are super-vertices" +
+        " in the graph which receive a lot of messages (total serialized " +
+        "size of messages goes beyond the maximum size of a byte array), " +
+        "setting this option to true will remove that limit");
+  }
+}