You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/15 21:17:44 UTC

svn commit: r1409973 [2/3] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/benchmark/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph/src...

Copied: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java (from r1408926, giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java?p2=giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java&p1=giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java&r1=1408926&r2=1409973&rev=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java Thu Nov 15 20:17:38 2012
@@ -18,25 +18,23 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
 import com.google.common.collect.Maps;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
 
 /**
- * A generic container that stores vertices.  Vertex ids will map to exactly
- * one partition.
+ * A simple map-based container that stores vertices.  Vertex ids will map to
+ * exactly one partition.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -44,31 +42,27 @@ import java.util.concurrent.ConcurrentSk
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class Partition<I extends WritableComparable,
+public class SimplePartition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Writable {
+    implements Partition<I, V, E, M> {
   /** Configuration from the worker */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
   /** Partition id */
-  private final int id;
+  private int id;
   /** Vertex map for this range (keyed by index) */
-  private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
+  private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
   /** Context used to report progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
+  private Progressable progressable;
 
   /**
-   * Constructor.
-   *
-   * @param conf Configuration.
-   * @param id Partition id.
-   * @param context Mapper context
+   * Constructor for reflection.
    */
-  public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-                   int id,
-                   Mapper<?, ?, ?, ?>.Context context) {
-    this.conf = conf;
-    this.id = id;
-    this.context = context;
+  public SimplePartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
     if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
         GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
       vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
@@ -77,61 +71,34 @@ public class Partition<I extends Writabl
     }
   }
 
-  /**
-   * Get the vertex for this vertex index.
-   *
-   * @param vertexIndex Vertex index to search for
-   * @return Vertex if it exists, null otherwise
-   */
+  @Override
   public Vertex<I, V, E, M> getVertex(I vertexIndex) {
     return vertexMap.get(vertexIndex);
   }
 
-  /**
-   * Put a vertex into the Partition
-   *
-   * @param vertex Vertex to put in the Partition
-   * @return old vertex value (i.e. null if none existed prior)
-   */
+  @Override
   public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
     return vertexMap.put(vertex.getId(), vertex);
   }
 
-  /**
-   * Remove a vertex from the Partition
-   *
-   * @param vertexIndex Vertex index to remove
-   * @return The removed vertex.
-   */
+  @Override
   public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
     return vertexMap.remove(vertexIndex);
   }
 
-  /**
-   * Get a collection of the vertices.
-   *
-   * @return Collection of the vertices
-   */
-  public Collection<Vertex<I, V, E , M>> getVertices() {
-    return vertexMap.values();
-  }
-
-  /**
-   * Put several vertices in the partition.
-   *
-   * @param vertices Vertices to add
-   */
-  public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
-    for (Vertex<I, V, E , M> vertex : vertices) {
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    for (Vertex<I, V, E , M> vertex : partition) {
       vertexMap.put(vertex.getId(), vertex);
     }
   }
 
-  /**
-   * Get the number of edges in this partition.  Computed on the fly.
-   *
-   * @return Number of edges.
-   */
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
   public long getEdgeCount() {
     long edges = 0;
     for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
@@ -140,27 +107,46 @@ public class Partition<I extends Writabl
     return edges;
   }
 
-  /**
-   * Get the partition id.
-   *
-   * @return Id of this partition.
-   */
+  @Override
   public int getId() {
     return id;
   }
 
   @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // No-op, vertices are stored as Java objects in this partition
+  }
+
+  @Override
   public String toString() {
-    return "(id=" + getId() + ",V=" + vertexMap.size() +
-        ",E=" + getEdgeCount() + ")";
+    return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
+    if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+        GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
+    } else {
+      vertexMap = Maps.newConcurrentMap();
+    }
+    id = input.readInt();
     int vertices = input.readInt();
     for (int i = 0; i < vertices; ++i) {
       Vertex<I, V, E, M> vertex = conf.createVertex();
-      context.progress();
+      if (progressable != null) {
+        progressable.progress();
+      }
       vertex.readFields(input);
       if (vertexMap.put(vertex.getId(), vertex) != null) {
         throw new IllegalStateException(
@@ -172,10 +158,29 @@ public class Partition<I extends Writabl
 
   @Override
   public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
     output.writeInt(vertexMap.size());
     for (Vertex vertex : vertexMap.values()) {
-      context.progress();
+      if (progressable != null) {
+        progressable.progress();
+      }
       vertex.write(output);
     }
   }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return vertexMap.values().iterator();
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Thu Nov 15 20:17:38 2012
@@ -19,14 +19,12 @@
 package org.apache.giraph.graph.partition;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 import com.google.common.collect.Maps;
 
-import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -63,25 +61,14 @@ public class SimplePartitionStore<I exte
 
   @Override
   public void addPartition(Partition<I, V, E, M> partition) {
-    if (partitions.putIfAbsent(partition.getId(), partition) != null) {
-      throw new IllegalStateException("addPartition: partition " +
-          partition.getId() + " already exists");
-    }
-  }
-
-  @Override
-  public void addPartitionVertices(Integer partitionId,
-                                   Collection<Vertex<I, V, E, M>> vertices) {
-    Partition<I, V, E, M> partition = partitions.get(partitionId);
-    if (partition == null) {
-      Partition<I, V, E, M> newPartition = new Partition<I, V, E, M>(conf,
-          partitionId, context);
-      partition = partitions.putIfAbsent(partitionId, newPartition);
-      if (partition == null) {
-        partition = newPartition;
+    Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
+    if (oldPartition == null) {
+      oldPartition = partitions.putIfAbsent(partition.getId(), partition);
+      if (oldPartition == null) {
+        return;
       }
     }
-    partition.putVertices(vertices);
+    oldPartition.addPartition(partition);
   }
 
   @Override

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Vertex id message collection that stores everything in a single byte array
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayVertexIdMessageCollection<I extends WritableComparable,
+    M extends Writable> implements Writable,
+    ImmutableClassesGiraphConfigurable {
+  /** Extended data output */
+  private ExtendedDataOutput extendedDataOutput;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration;
+
+  /**
+   * Constructor for reflection
+   */
+  public ByteArrayVertexIdMessageCollection() { }
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  public void initialize() {
+    extendedDataOutput = configuration.createExtendedDataOutput();
+  }
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  public void initialize(int expectedSize) {
+    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
+  }
+
+  /**
+   * Add a vertex id and message pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param message Message
+   */
+  public void add(I vertexId, M message) {
+    try {
+      vertexId.write(extendedDataOutput);
+      message.write(extendedDataOutput);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  /**
+   * Get the number of bytes used
+   *
+   * @return Number of bytes used
+   */
+  public int getSize() {
+    return extendedDataOutput.getPos();
+  }
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return True iff there are no pairs in the list
+   */
+  public boolean isEmpty() {
+    return extendedDataOutput.getPos() == 0;
+  }
+
+  /**
+   * Get iterator through elements of this object.
+   *
+   * @return {@link Iterator} iterator
+   */
+  public Iterator getIterator() {
+    return new Iterator();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeInt(extendedDataOutput.getPos());
+    dataOutput.write(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    int size = dataInput.readInt();
+    byte[] buf = new byte[size];
+    dataInput.readFully(buf);
+    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+  }
+
+  /**
+   * Special iterator class which we'll use to iterate through elements of
+   * {@link PairList}, without having to create new object as wrapper for
+   * each pair.
+   *
+   * Protocol is somewhat similar to the protocol of {@link java.util.Iterator}
+   * only here next() doesn't return the next object, it just moves along in
+   * the collection. Values related to current pair can be retrieved by calling
+   * getCurrentFirst() and getCurrentSecond() methods.
+   *
+   * Not thread-safe.
+   */
+  public class Iterator {
+    /** Reader of the serialized messages */
+    private ExtendedDataInput extendedDataInput =
+        configuration.createExtendedDataInput(
+            extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    /** Current vertex id */
+    private I vertexId;
+    /** Current message */
+    private M message;
+
+    /**
+     * Returns true if the iteration has more elements.
+     *
+     * @return True if the iteration has more elements.
+     */
+    public boolean hasNext() {
+      return extendedDataInput.available() > 0;
+    }
+
+    /**
+     * Moves to the next element in the iteration.
+     */
+    public void next() {
+      vertexId = configuration.createVertexId();
+      message = configuration.createMessageValue();
+      try {
+        vertexId.readFields(extendedDataInput);
+        message.readFields(extendedDataInput);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Get the current vertex id
+     *
+     * @return Current vertex id
+     */
+    public I getCurrentVertexId() {
+      return vertexId;
+    }
+
+    /**
+     * Get the current message
+     *
+     * @return Current message
+     */
+    public M getCurrentMessage() {
+      return message;
+    }
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special input that reads from a DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferInputStream implements DataInput {
+  /** Internal dynamic channel buffer */
+  private DynamicChannelBuffer buffer;
+
+  /**
+   * Constructor.
+   *
+   * @param buffer Buffer to read from
+   */
+  public DynamicChannelBufferInputStream(DynamicChannelBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    buffer.readBytes(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    buffer.readBytes(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    buffer.skipBytes(n);
+    return n;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    int ch = buffer.readByte();
+    if (ch < 0) {
+      throw new IllegalStateException("readBoolean: Got " + ch);
+    }
+    return ch != 0;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return buffer.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return buffer.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return buffer.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return buffer.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return buffer.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return buffer.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return buffer.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return buffer.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return buffer.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    char[] buf = new char[128];
+
+    int room = buf.length;
+    int offset = 0;
+    int c;
+
+  loop:
+    while (true) {
+      c = buffer.readByte();
+      switch (c) {
+      case -1:
+      case '\n':
+        break loop;
+      case '\r':
+        int c2 = buffer.readByte();
+        if ((c2 != '\n') && (c2 != -1)) {
+          buffer.readerIndex(buffer.readerIndex() - 1);
+        }
+        break loop;
+      default:
+        if (--room < 0) {
+          char[] replacebuf = new char[offset + 128];
+          room = replacebuf.length - offset - 1;
+          System.arraycopy(buf, 0, replacebuf, 0, offset);
+          buf = replacebuf;
+        }
+        buf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(buf, 0, offset);
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    int utflen = buffer.readUnsignedShort();
+
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int c;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    buffer.readBytes(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+        /* 0xxxxxxx */
+        count++;
+        chararr[chararrCount++] = (char) c;
+        break;
+      case 12:
+      case 13:
+        /* 110x xxxx   10xx xxxx*/
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException(
+                "malformed input around byte " + count);
+        }
+        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+            (char2 & 0x3F));
+        break;
+      case 14:
+        /* 1110 xxxx  10xx xxxx  10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 2];
+        char3 = (int) bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + (count - 1));
+        }
+        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+        /* 10xx xxxx,  1111 xxxx */
+        throw new UTFDataFormatException(
+            "malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DirectChannelBufferFactory;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special output stream that can grow as needed and dumps to a
+ * DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferOutputStream implements DataOutput {
+  /** Internal dynamic channel buffer */
+  private DynamicChannelBuffer buffer;
+
+  /**
+   * Constructor
+   *
+   * @param estimatedLength Estimated length of the buffer
+   */
+  public DynamicChannelBufferOutputStream(int estimatedLength) {
+    buffer = (DynamicChannelBuffer)
+        ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN,
+            estimatedLength, DirectChannelBufferFactory.getInstance());
+  }
+
+  /**
+   * Constructor with the buffer to use
+   *
+   * @param buffer Buffer to be written to (cleared before use)
+   */
+  public DynamicChannelBufferOutputStream(DynamicChannelBuffer buffer) {
+    this.buffer = buffer;
+    buffer.clear();
+  }
+
+  /**
+   * Get the dynamic channel buffer
+   *
+   * @return dynamic channel buffer (not a copy)
+   */
+  public DynamicChannelBuffer getDynamicChannelBuffer() {
+    return buffer;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buffer.writeByte(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    buffer.writeBytes(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    buffer.writeBytes(b, off, len);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    buffer.writeByte(v ? 1 : 0);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    buffer.writeByte(v);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    buffer.writeShort(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    buffer.writeChar(v);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    buffer.writeInt(v);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    buffer.writeLong(v);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    buffer.writeFloat(v);
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    buffer.writeDouble(v);
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    for (int i = 0; i < len; i++) {
+      buffer.writeByte((byte) s.charAt(i));
+    }
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    for (int i = 0; i < len; i++) {
+      int v = s.charAt(i);
+      buffer.writeByte((v >>> 8) & 0xFF);
+      buffer.writeByte((v >>> 0) & 0xFF);
+    }
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int strlen = s.length();
+    int utflen = 0;
+    int c;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    buffer.writeByte((byte) ((utflen >>> 8) & 0xFF));
+    buffer.writeByte((byte) ((utflen >>> 0) & 0xFF));
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      buffer.writeByte((byte) c);
+    }
+
+    for (; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        buffer.writeByte((byte) c);
+
+      } else if (c > 0x07FF) {
+        buffer.writeByte((byte) (0xE0 | ((c >> 12) & 0x0F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  6) & 0x3F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  0) & 0x3F)));
+      } else {
+        buffer.writeByte((byte) (0xC0 | ((c >>  6) & 0x1F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  0) & 0x3F)));
+      }
+    }
+  }
+}
+

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Provides access to a internals of ByteArrayInputStream
+ */
+public class ExtendedByteArrayDataInput extends ByteArrayInputStream
+    implements ExtendedDataInput {
+  /** Internal data input */
+  private final DataInput dataInput;
+  /**
+   * Constructor
+   *
+   * @param buf Buffer to read
+   */
+  public ExtendedByteArrayDataInput(byte[] buf) {
+    super(buf);
+    dataInput = new DataInputStream(this);
+  }
+
+  /**
+   * Get access to portion of a byte array
+   *
+   * @param buf Byte array to access
+   * @param offset Offset into the byte array
+   * @param length Length to read
+   */
+  public ExtendedByteArrayDataInput(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+    dataInput = new DataInputStream(this);
+  }
+
+  @Override
+  public int getPos() {
+    return pos;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    dataInput.readFully(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    dataInput.readFully(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    return dataInput.skipBytes(n);
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return dataInput.readBoolean();
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return dataInput.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return dataInput.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return dataInput.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return dataInput.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return dataInput.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return dataInput.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return dataInput.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return dataInput.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return dataInput.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    return dataInput.readLine();
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    return dataInput.readUTF();
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Adds some functionality to ByteArrayOutputStream,
+ * such as an option to write int value over previously written data
+ * and directly get the byte array.
+ */
+public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
+    implements ExtendedDataOutput {
+  /** Default number of bytes */
+  private static final int DEFAULT_BYTES = 32;
+  /** Internal data output */
+  private final DataOutput dataOutput;
+
+  /**
+   * Uses the byte array provided or if null, use a default size
+   *
+   * @param buf Buffer to use
+   */
+  public ExtendedByteArrayDataOutput(byte[] buf) {
+    if (buf == null) {
+      this.buf = new byte[DEFAULT_BYTES];
+    } else {
+      this.buf = buf;
+    }
+    dataOutput = new DataOutputStream(this);
+  }
+
+  /**
+   * Uses the byte array provided at the given pos
+   *
+   * @param buf Buffer to use
+   * @param pos Position in the buffer to start writing from
+   */
+  public ExtendedByteArrayDataOutput(byte[] buf, int pos) {
+    this.buf = buf;
+    this.count = pos;
+    dataOutput = new DataOutputStream(this);
+  }
+
+  /**
+   * Creates a new byte array output stream. The buffer capacity is
+   * initially 32 bytes, though its size increases if necessary.
+   */
+  public ExtendedByteArrayDataOutput() {
+    this(DEFAULT_BYTES);
+  }
+
+  /**
+   * Creates a new byte array output stream, with a buffer capacity of
+   * the specified size, in bytes.
+   *
+   * @param size the initial size.
+   * @exception  IllegalArgumentException if size is negative.
+   */
+  public ExtendedByteArrayDataOutput(int size) {
+    if (size < 0) {
+      throw new IllegalArgumentException("Negative initial size: " +
+          size);
+    }
+    buf = new byte[size];
+    dataOutput = new DataOutputStream(this);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    dataOutput.writeBoolean(v);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    dataOutput.writeByte(v);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    dataOutput.writeShort(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    dataOutput.writeChar(v);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    dataOutput.writeInt(v);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    dataOutput.writeLong(v);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    dataOutput.writeFloat(v);
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    dataOutput.writeDouble(v);
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    dataOutput.writeBytes(s);
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    dataOutput.writeChars(s);
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    dataOutput.writeUTF(s);
+  }
+
+  @Override
+  public void writeInt(int position, int value) {
+    if (position + 4 > count) {
+      throw new IndexOutOfBoundsException(
+          "writeIntOnPosition: Tried to write int to position " + position +
+              " but current length is " + count);
+    }
+    buf[position] = (byte) ((value >>> 24) & 0xFF);
+    buf[position + 1] = (byte) ((value >>> 16) & 0xFF);
+    buf[position + 2] = (byte) ((value >>> 8) & 0xFF);
+    buf[position + 3] = (byte) ((value >>> 0) & 0xFF);
+  }
+
+  @Override
+  public byte[] getByteArray() {
+    return buf;
+  }
+
+  @Override
+  public int getPos() {
+    return count;
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+
+/**
+ * Add some functionality to data input
+ */
+public interface ExtendedDataInput extends DataInput {
+  /**
+   * Get the position of what has been read
+   *
+   * @return How many bytes have been read?
+   */
+  int getPos();
+
+  /**
+   * How many bytes are available?
+   *
+   * @return Bytes available
+   */
+  int available();
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+  /**
+   * In order to write a size as a first part of an data output, it is
+   * useful to be able to write an int at an arbitrary location in the stream
+   *
+   * @param pos Byte position in the output stream
+   * @param value Value to write
+   */
+  void writeInt(int pos, int value);
+
+  /**
+   * Get the position in the output stream
+   *
+   * @return Position in the output stream
+   */
+  int getPos();
+
+  /**
+   * Get the internal byte array (if possible), read-only
+   *
+   * @return Internal byte array (do not modify)
+   */
+  byte[] getByteArray();
+
+  /**
+   * Copies the internal byte array
+   *
+   * @return Copied byte array
+   */
+  byte[] toByteArray();
+
+  /**
+   * Clears the buffer
+   */
+  void reset();
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+public class UnsafeByteArrayInputStream implements ExtendedDataInput {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(
+      UnsafeByteArrayInputStream.class);
+  /** Access to the unsafe class */
+  private static final sun.misc.Unsafe UNSAFE;
+  static {
+    try {
+      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      field.setAccessible(true);
+      UNSAFE = (sun.misc.Unsafe) field.get(null);
+      // Checkstyle exception due to needing to check if unsafe is allowed
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
+          "get unsafe", e);
+    }
+  }
+  /** Offset of a byte array */
+  private static final long BYTE_ARRAY_OFFSET  =
+      UNSAFE.arrayBaseOffset(byte[].class);
+  /** Offset of a long array */
+  private static final long LONG_ARRAY_OFFSET =
+      UNSAFE.arrayBaseOffset(long[].class);
+  /** Offset of a double array */
+  private static final long DOUBLE_ARRAY_OFFSET =
+      UNSAFE.arrayBaseOffset(double[].class);
+
+  /** Byte buffer */
+  private final byte[] buf;
+  /** Buffer length */
+  private final int bufLength;
+  /** Position in the buffer */
+  private int pos = 0;
+
+  /**
+   * Constructor
+   *
+   * @param buf Buffer to read from
+   */
+  public UnsafeByteArrayInputStream(byte[] buf) {
+    this.buf = buf;
+    this.bufLength = buf.length;
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param buf Buffer to read from
+   * @param offset Offsetin the buffer to start reading from
+   * @param length Max length of the buffer to read
+   */
+  public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) {
+    this.buf = buf;
+    this.pos = offset;
+    this.bufLength = length;
+  }
+
+  /**
+   * How many bytes are still available?
+   *
+   * @return Number of bytes available
+   */
+  public int available() {
+    return bufLength - pos;
+  }
+
+  /**
+   * What position in the stream?
+   *
+   * @return Position
+   */
+  public int getPos() {
+    return pos;
+  }
+
+  /**
+   * Check whether there are enough remaining bytes for an operation
+   *
+   * @param requiredBytes Bytes required to read
+   * @throws IOException When there are not enough bytes to read
+   */
+  private void ensureRemaining(int requiredBytes) throws IOException {
+    if (bufLength - pos < requiredBytes) {
+      throw new IOException("ensureRemaining: Only " + (bufLength - pos) +
+          " bytes remaining, trying to read " + requiredBytes);
+    }
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    ensureRemaining(b.length);
+    System.arraycopy(buf, pos, b, 0, b.length);
+    pos += b.length;
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    ensureRemaining(len);
+    System.arraycopy(buf, pos, b, off, len);
+    pos += len;
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    ensureRemaining(n);
+    pos += n;
+    return n;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN);
+    boolean value = UNSAFE.getBoolean(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_BOOLEAN;
+    return value;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_BYTE);
+    byte value = UNSAFE.getByte(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_BYTE;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return (short) (readByte() & 0xFF);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_SHORT);
+    short value = UNSAFE.getShort(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_SHORT;
+    return value;
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return readShort() & 0xFFFF;
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_CHAR);
+    char value = UNSAFE.getChar(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_CHAR;
+    return value;
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_INT);
+    int value = UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_INT;
+    return value;
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_LONG);
+    long value = UNSAFE.getLong(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_LONG;
+    return value;
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_FLOAT);
+    float value = UNSAFE.getFloat(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_FLOAT;
+    return value;
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    ensureRemaining(UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE);
+    double value = UNSAFE.getDouble(buf,
+        BYTE_ARRAY_OFFSET + pos);
+    pos += UnsafeByteArrayOutputStream.SIZE_OF_DOUBLE;
+    return value;
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    char[] tmpBuf = new char[128];
+
+    int room = tmpBuf.length;
+    int offset = 0;
+    int c;
+
+  loop:
+    while (true) {
+      c = readByte();
+      switch (c) {
+      case -1:
+      case '\n':
+        break loop;
+      case '\r':
+        int c2 = readByte();
+        if ((c2 != '\n') && (c2 != -1)) {
+          pos -= 1;
+        }
+        break loop;
+      default:
+        if (--room < 0) {
+          char[] replacebuf = new char[offset + 128];
+          room = replacebuf.length - offset - 1;
+          System.arraycopy(tmpBuf, 0, replacebuf, 0, offset);
+          tmpBuf = replacebuf;
+        }
+        tmpBuf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(tmpBuf, 0, offset);
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    int utflen = readUnsignedShort();
+
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int c;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+        /* 0xxxxxxx */
+        count++;
+        chararr[chararrCount++] = (char) c;
+        break;
+      case 12:
+      case 13:
+        /* 110x xxxx   10xx xxxx*/
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + count);
+        }
+        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+            (char2 & 0x3F));
+        break;
+      case 14:
+        /* 1110 xxxx  10xx xxxx  10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 2];
+        char3 = (int) bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + (count - 1));
+        }
+        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+        /* 10xx xxxx,  1111 xxxx */
+        throw new UTFDataFormatException(
+            "malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+
+  /**
+   * Get an int at an arbitrary position in a byte[]
+   *
+   * @param buf Buffer to get the int from
+   * @param pos Position in the buffer to get the int from
+   * @return Int at the buffer position
+   */
+  public static int getInt(byte[] buf, int pos) {
+    return UNSAFE.getInt(buf,
+        BYTE_ARRAY_OFFSET + pos);
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+public class UnsafeByteArrayOutputStream extends OutputStream
+    implements ExtendedDataOutput {
+  static {
+    try {
+      Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+      field.setAccessible(true);
+      UNSAFE = (sun.misc.Unsafe) field.get(null);
+      // Checkstyle exception due to needing to check if unsafe is allowed
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
+          "get unsafe", e);
+    }
+  }
+
+  /** Bytes used in a boolean */
+  public static final int SIZE_OF_BOOLEAN = 1;
+  /** Bytes used in a byte */
+  public static final int SIZE_OF_BYTE = 1;
+  /** Bytes used in a char */
+  public static final int SIZE_OF_CHAR = 2;
+  /** Bytes used in a short */
+  public static final int SIZE_OF_SHORT = 2;
+  /** Bytes used in a medium */
+  public static final int SIZE_OF_MEDIUM = 3;
+  /** Bytes used in an int */
+  public static final int SIZE_OF_INT = 4;
+  /** Bytes used in a float */
+  public static final int SIZE_OF_FLOAT = 4;
+  /** Bytes used in a long */
+  public static final int SIZE_OF_LONG = 8;
+  /** Bytes used in a double */
+  public static final int SIZE_OF_DOUBLE = 8;
+  /** Default number of bytes */
+  private static final int DEFAULT_BYTES = 32;
+  /** Access to the unsafe class */
+  private static final sun.misc.Unsafe UNSAFE;
+
+  /** Offset of a byte array */
+  private static final long BYTE_ARRAY_OFFSET  =
+      UNSAFE.arrayBaseOffset(byte[].class);
+  /** Offset of a long array */
+  private static final long LONG_ARRAY_OFFSET =
+      UNSAFE.arrayBaseOffset(long[].class);
+  /** Offset of a double array */
+  private static final long DOUBLE_ARRAY_OFFSET =
+      UNSAFE.arrayBaseOffset(double[].class);
+
+  /** Byte buffer */
+  private byte[] buf;
+  /** Position in the buffer */
+  private int pos = 0;
+
+  /**
+   * Constructor
+   */
+  public UnsafeByteArrayOutputStream() {
+    this(DEFAULT_BYTES);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param size Initial size of the underlying byte array
+   */
+  public UnsafeByteArrayOutputStream(int size) {
+    buf = new byte[size];
+  }
+
+  /**
+   * Constructor to take in a buffer
+   *
+   * @param buf Buffer to start with, or if null, create own buffer
+   */
+  public UnsafeByteArrayOutputStream(byte[] buf) {
+    if (buf == null) {
+      this.buf = new byte[DEFAULT_BYTES];
+    } else {
+      this.buf = buf;
+    }
+  }
+
+  /**
+   * Constructor to take in a buffer with a given position into that buffer
+   *
+   * @param buf Buffer to start with
+   * @param pos Position to write at the buffer
+   */
+  public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
+    this.buf = buf;
+    this.pos = pos;
+  }
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the size.
+   * Creates and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  private void ensureSize(int size) {
+    if (pos + size > buf.length) {
+      byte[] newBuf = new byte[(buf.length + size) << 1];
+      System.arraycopy(buf, 0, newBuf, 0, pos);
+      buf = newBuf;
+    }
+  }
+
+  @Override
+  public byte[] getByteArray() {
+    return buf;
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    return Arrays.copyOf(buf, pos);
+
+  }
+
+  @Override
+  public void reset() {
+    pos = 0;
+  }
+
+  @Override
+  public int getPos() {
+    return pos;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    ensureSize(SIZE_OF_BYTE);
+    buf[pos] = (byte) b;
+    pos += SIZE_OF_BYTE;
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    ensureSize(b.length);
+    System.arraycopy(b, 0, buf, pos, b.length);
+    pos += b.length;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    ensureSize(len);
+    System.arraycopy(b, off, buf, pos, len);
+    pos += len;
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    ensureSize(SIZE_OF_BOOLEAN);
+    UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += SIZE_OF_BOOLEAN;
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    ensureSize(SIZE_OF_BYTE);
+    UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
+    pos += SIZE_OF_BYTE;
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    ensureSize(SIZE_OF_SHORT);
+    UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
+    pos += SIZE_OF_SHORT;
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    ensureSize(SIZE_OF_CHAR);
+    UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
+    pos += SIZE_OF_CHAR;
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    ensureSize(SIZE_OF_INT);
+    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += SIZE_OF_INT;
+  }
+
+  @Override
+  public void writeInt(int pos, int value) {
+    if (pos + SIZE_OF_INT > this.pos) {
+      throw new IndexOutOfBoundsException(
+          "writeInt: Tried to write int to position " + pos +
+              " but current length is " + this.pos);
+    }
+    UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    ensureSize(SIZE_OF_LONG);
+    UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += SIZE_OF_LONG;
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    ensureSize(SIZE_OF_FLOAT);
+    UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += SIZE_OF_FLOAT;
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    ensureSize(SIZE_OF_DOUBLE);
+    UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
+    pos += SIZE_OF_DOUBLE;
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    ensureSize(len);
+    for (int i = 0; i < len; i++) {
+      buf[pos++] = (byte) s.charAt(i);
+    }
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    ensureSize(len);
+    for (int i = 0; i < len; i++) {
+      int v = s.charAt(i);
+      buf[pos++] = (byte) ((v >>> 8) & 0xFF);
+      buf[pos++] = (byte) ((v >>> 0) & 0xFF);
+    }
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int strlen = s.length();
+    int utflen = 0;
+    int c;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    buf[pos++] = (byte) ((utflen >>> 8) & 0xFF);
+    buf[pos++] = (byte) ((utflen >>> 0) & 0xFF);
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      buf[pos++] = (byte) c;
+    }
+
+    for (; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        buf[pos++] = (byte) c;
+
+      } else if (c > 0x07FF) {
+        buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+        buf[pos++] = (byte) (0x80 | ((c >>  6) & 0x3F));
+        buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+      } else {
+        buf[pos++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+        buf[pos++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+      }
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java Thu Nov 15 20:17:38 2012
@@ -26,7 +26,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
 import org.apache.hadoop.conf.Configuration;
@@ -110,6 +109,74 @@ public class WritableUtils {
   }
 
   /**
+   * Read fields from byteArray to a Writeable object, skipping the size.
+   * Serialization method is choosable
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableObject Object to fill in the fields.
+   * @param unsafe Use unsafe deserialization
+   */
+  public static void readFieldsFromByteArrayWithSize(
+      byte[] byteArray, Writable writableObject, boolean unsafe) {
+    ExtendedDataInput extendedDataInput;
+    if (unsafe) {
+      extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
+    } else {
+      extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
+    }
+    try {
+      extendedDataInput.readInt();
+      writableObject.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readFieldsFromByteArrayWithSize: IOException", e);
+    }
+  }
+
+  /**
+   * Write object to a byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param writableObject Object to write from.
+   * @param unsafe Use unsafe serialization?
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArrayWithSize(Writable writableObject,
+                                                boolean unsafe) {
+    return writeToByteArrayWithSize(writableObject, null, unsafe);
+  }
+
+  /**
+   * Write object to a byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param writableObject Object to write from.
+   * @param buffer Use this buffer instead
+   * @param unsafe Use unsafe serialization?
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArrayWithSize(Writable writableObject,
+                                                byte[] buffer,
+                                                boolean unsafe) {
+    ExtendedDataOutput extendedDataOutput;
+    if (unsafe) {
+      extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
+    } else {
+      extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
+    }
+    try {
+      extendedDataOutput.writeInt(-1);
+      writableObject.write(extendedDataOutput);
+      extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
+    } catch (IOException e) {
+      throw new IllegalStateException("writeToByteArrayWithSize: " +
+          "IOException", e);
+    }
+
+    return extendedDataOutput.getByteArray();
+  }
+
+  /**
    * Write object to a ZooKeeper znode.
    *
    * @param zkExt ZooKeeper instance.

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Thu Nov 15 20:17:38 2012
@@ -18,6 +18,9 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collection;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.NettyClient;
@@ -27,6 +30,7 @@ import org.apache.giraph.comm.requests.S
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
@@ -38,11 +42,6 @@ import static org.junit.Assert.assertEqu
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.Collection;
-
 /**
  * Test all the netty failure scenarios
  */
@@ -83,12 +82,15 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+    PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
+            IntWritable>>
         dataToSend = new PairList<Integer,
-        VertexIdMessageCollection<IntWritable, IntWritable>>();
+        ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
     dataToSend.initialize();
-    VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
-        new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+    ByteArrayVertexIdMessageCollection<IntWritable,
+        IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+    vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Thu Nov 15 20:17:38 2012
@@ -18,20 +18,29 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
@@ -44,16 +53,6 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Test all the different netty requests.
  */
@@ -108,21 +107,19 @@ public class RequestTest {
   public void sendVertexPartition() throws IOException {
     // Data to send
     int partitionId = 13;
-    Collection<Vertex<IntWritable, IntWritable, IntWritable,
-        IntWritable>> vertices =
-        new ArrayList<Vertex<IntWritable, IntWritable,
-        IntWritable, IntWritable>>();
+    Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition =
+        conf.createPartition(partitionId, null);
     for (int i = 0; i < 10; ++i) {
       TestVertex vertex = new TestVertex();
       vertex.initialize(new IntWritable(i), new IntWritable(i));
-      vertices.add(vertex);
+      partition.putVertex(vertex);
     }
 
     // Send the request
     SendVertexRequest<IntWritable, IntWritable, IntWritable,
     IntWritable> request =
       new SendVertexRequest<IntWritable, IntWritable,
-      IntWritable, IntWritable>(partitionId, vertices);
+      IntWritable, IntWritable>(partition);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -138,7 +135,7 @@ public class RequestTest {
     int total = 0;
     for (Vertex<IntWritable, IntWritable,
         IntWritable, IntWritable> vertex :
-        partitionStore.getPartition(partitionId).getVertices()) {
+        partitionStore.getPartition(partitionId)) {
       total += vertex.getId().get();
     }
     assertEquals(total, 45);
@@ -147,13 +144,16 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    PairList<Integer, VertexIdMessageCollection<IntWritable, IntWritable>>
+    PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
+        IntWritable>>
         dataToSend = new PairList<Integer,
-        VertexIdMessageCollection<IntWritable, IntWritable>>();
+        ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
     dataToSend.initialize();
     int partitionId = 0;
-    VertexIdMessageCollection<IntWritable, IntWritable> vertexIdMessages =
-        new VertexIdMessageCollection<IntWritable, IntWritable>(conf);
+    ByteArrayVertexIdMessageCollection<IntWritable,
+        IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+    vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
     for (int i = 1; i < 7; ++i) {