You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/11/15 21:17:44 UTC

svn commit: r1409973 [1/3] - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/benchmark/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/aggregators/ giraph/src...

Author: aching
Date: Thu Nov 15 20:17:38 2012
New Revision: 1409973

URL: http://svn.apache.org/viewvc?rev=1409973&view=rev
Log:
GIRAPH-417: Serialize the graph/message cache into byte[] for
improving memory usage and compute speed. (aching)

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
      - copied, changed from r1408926, giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessageCollection.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayInputStream.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestMutableVertex.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestGiraphTransferRegulator.java
Removed:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/VertexIdMessageCollection.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Nov 15 20:17:38 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-417: Serialize the graph/message cache into byte[] for
+  improving memory usage and compute speed. (aching)
+
   GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false
   (majakabiljo)
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Thu Nov 15 20:17:38 2012
@@ -28,6 +28,9 @@ import org.apache.giraph.graph.VertexOut
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.WorkerContext;
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+
+import org.apache.giraph.graph.partition.Partition;
+
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -78,6 +81,9 @@ public class GiraphConfiguration extends
   public static final String AGGREGATOR_WRITER_CLASS =
       "giraph.aggregatorWriterClass";
 
+  /** Partition class - optional */
+  public static final String PARTITION_CLASS = "giraph.partitionClass";
+
   /**
    * Minimum number of simultaneous workers before this job can run (int)
    */
@@ -620,6 +626,15 @@ public class GiraphConfiguration extends
   /** Default is not to do authenticate and authorization with Netty. */
   public static final boolean DEFAULT_AUTHENTICATE = false;
 
+  /** Use unsafe serialization? */
+  public static final String USE_UNSAFE_SERIALIZATION =
+      "giraph.useUnsafeSerialization";
+  /**
+   * Use unsafe serialization default is true (use it if you can,
+   * its much faster)!
+   */
+  public static final boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+
   /**
    * Constructor that creates the configuration
    */
@@ -754,6 +769,18 @@ public class GiraphConfiguration extends
   }
 
   /**
+   * Set the partition class (optional)
+   *
+   * @param partitionClass Determines the why partitions are stored
+   */
+  public final void setPartitionClass(
+      Class<? extends Partition> partitionClass) {
+    setClass(PARTITION_CLASS,
+        partitionClass,
+        Partition.class);
+  }
+
+  /**
    * Set worker configuration for determining what is required for
    * a superstep.
    *
@@ -987,4 +1014,13 @@ public class GiraphConfiguration extends
   public long getInputSplitMaxEdges() {
     return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
   }
+
+  /**
+   * Set whether to use unsafe serialization
+   *
+   * @param useUnsafeSerialization If true, use unsafe serialization
+   */
+  public void useUnsafeSerialization(boolean useUnsafeSerialization) {
+    setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Thu Nov 15 20:17:38 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph;
 
+import java.util.List;
 import org.apache.giraph.graph.AggregatorWriter;
 import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.DefaultMasterCompute;
@@ -34,14 +35,21 @@ import org.apache.giraph.graph.WorkerCon
 import org.apache.giraph.graph.partition.GraphPartitionerFactory;
 import org.apache.giraph.graph.partition.HashPartitionerFactory;
 import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.SimplePartition;
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-
-import java.util.List;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * The classes set here are immutable, the remaining configuration is mutable.
@@ -97,6 +105,15 @@ public class ImmutableClassesGiraphConfi
   /** Master compute class - cached for fast access */
   private final Class<? extends MasterCompute> masterComputeClass;
 
+  /** Partition class - cached for fast accesss */
+  private final Class<? extends Partition<I, V, E, M>> partitionClass;
+
+  /**
+   * Use unsafe serialization? Cached for fast access to instantiate the
+   * extended data input/output classes
+   */
+  private final boolean useUnsafeSerialization;
+
   /**
    * Constructor.  Takes the configuration and then gets the classes out of
    * them for Giraph
@@ -146,6 +163,12 @@ public class ImmutableClassesGiraphConfi
         DefaultWorkerContext.class, WorkerContext.class);
     masterComputeClass =  conf.getClass(MASTER_COMPUTE_CLASS,
         DefaultMasterCompute.class, MasterCompute.class);
+
+    partitionClass = (Class<? extends Partition<I, V, E, M>>)
+        conf.getClass(PARTITION_CLASS, SimplePartition.class);
+
+    useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
+        USE_UNSAFE_SERIALIZATION_DEFAULT);
   }
 
   /**
@@ -493,4 +516,88 @@ public class ImmutableClassesGiraphConfi
       }
     }
   }
+
+  /**
+   * Create a partition
+   *
+   * @param id Partition id
+   * @param progressable Progressable for reporting progress
+   * @return Instantiated partition
+   */
+  public Partition<I, V, E, M> createPartition(
+      int id, Progressable progressable) {
+    Partition<I, V, E, M> partition =
+        ReflectionUtils.newInstance(partitionClass, this);
+    partition.initialize(id, progressable);
+    return partition;
+  }
+
+  /**
+   * Use unsafe serialization?
+   *
+   * @return True if using unsafe serialization, false otherwise.
+   */
+  public boolean useUnsafeSerialization() {
+    return useUnsafeSerialization;
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput() {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream();
+    } else {
+      return new ExtendedByteArrayDataOutput();
+    }
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @param expectedSize Expected size
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream(expectedSize);
+    } else {
+      return new ExtendedByteArrayDataOutput(expectedSize);
+    }
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @param buf Buffer to use for the output (reuse perhaps)
+   * @param pos How much of the buffer is already used
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
+                                                     int pos) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream(buf, pos);
+    } else {
+      return new ExtendedByteArrayDataOutput(buf, pos);
+    }
+  }
+
+  /**
+   * Create an extended data input (can be subclassed)
+   *
+   * @param buf Buffer to use for the input
+   * @param off Where to start reading in the buffer
+   * @param length Maximum length of the buffer
+   * @return ExtendedDataInput object
+   */
+  public ExtendedDataInput createExtendedDataInput(
+      byte[] buf, int off, int length) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayInputStream(buf, off, length);
+    } else {
+      return new ExtendedByteArrayDataInput(buf, off, length);
+    }
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Thu Nov 15 20:17:38 2012
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.benchmark;
 
 import org.apache.commons.cli.CommandLine;
@@ -75,7 +74,9 @@ public class PageRankBenchmark implement
     options.addOption("c",
         "vertexClass",
         true,
-        "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex)");
+        "Vertex class (0 for HashMapVertex, 1 for EdgeListVertex, " +
+            "2 for RepresentativeVertex, " +
+            "3 for RepresentativeVertex with unsafe)");
     options.addOption("N",
         "name",
         true,
@@ -125,9 +126,17 @@ public class PageRankBenchmark implement
         (Integer.parseInt(cmd.getOptionValue('c')) == 1)) {
       job.getConfiguration().setVertexClass(
           EdgeListVertexPageRankBenchmark.class);
-    } else {
+    } else if (Integer.parseInt(cmd.getOptionValue('c')) == 0) {
       job.getConfiguration().setVertexClass(
           HashMapVertexPageRankBenchmark.class);
+    } else if (Integer.parseInt(cmd.getOptionValue('c')) == 2) {
+      job.getConfiguration().setVertexClass(
+          RepresentativeVertexPageRankBenchmark.class);
+      job.getConfiguration().useUnsafeSerialization(false);
+    } else if (Integer.parseInt(cmd.getOptionValue('c')) == 3) {
+      job.getConfiguration().setVertexClass(
+          RepresentativeVertexPageRankBenchmark.class);
+      job.getConfiguration().useUnsafeSerialization(true);
     }
     LOG.info("Using class " +
         job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/RepresentativeVertexPageRankBenchmark.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.IOException;
+import org.apache.giraph.graph.RepresentativeVertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Same benchmark code as {@link PageRankBenchmark}, but uses
+ * {@link org.apache.giraph.graph.RepresentativeVertex}
+ * implementation.
+ */
+public class RepresentativeVertexPageRankBenchmark extends
+    RepresentativeVertex<LongWritable, DoubleWritable,
+        DoubleWritable, DoubleWritable> {
+  @Override
+  public void compute(Iterable<DoubleWritable> messages) throws
+      IOException {
+    PageRankComputation.computePageRank(this, messages);
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java Thu Nov 15 20:17:38 2012
@@ -18,20 +18,19 @@
 
 package org.apache.giraph.comm;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 /**
  * Aggregates the messages to be send to workers so they can be sent
  * in bulk.  Not thread-safe.
@@ -43,7 +42,7 @@ import com.google.common.collect.Maps;
 public class SendMessageCache<I extends WritableComparable,
     M extends Writable> {
   /** Internal cache */
-  private final VertexIdMessageCollection<I, M>[] messageCache;
+  private final ByteArrayVertexIdMessageCollection<I, M>[] messageCache;
   /** Number of messages in each partition */
   private final int[] messageCounts;
   /** List of partition ids belonging to a worker */
@@ -74,7 +73,7 @@ public class SendMessageCache<I extends 
       workerPartitionIds.add(partitionOwner.getPartitionId());
       maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
     }
-    messageCache = new VertexIdMessageCollection[maxPartition + 1];
+    messageCache = new ByteArrayVertexIdMessageCollection[maxPartition + 1];
 
     int maxWorker = 0;
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
@@ -96,10 +95,11 @@ public class SendMessageCache<I extends 
   public int addMessage(WorkerInfo workerInfo,
     final int partitionId, I destVertexId, M message) {
     // Get the message collection
-    VertexIdMessageCollection<I, M> partitionMessages =
+    ByteArrayVertexIdMessageCollection<I, M> partitionMessages =
         messageCache[partitionId];
     if (partitionMessages == null) {
-      partitionMessages = new VertexIdMessageCollection<I, M>(conf);
+      partitionMessages = new ByteArrayVertexIdMessageCollection<I, M>();
+      partitionMessages.setConf(conf);
       partitionMessages.initialize();
       messageCache[partitionId] = partitionMessages;
     }
@@ -115,13 +115,13 @@ public class SendMessageCache<I extends 
    *
    * @param workerInfo the address of the worker who owns the data
    *                   partitions that are receiving the messages
-   * @return List of pairs (partitionId, VertexIdMessageCollection),
+   * @return List of pairs (partitionId, ByteArrayVertexIdMessageCollection),
    *         where all partition ids belong to workerInfo
    */
-  public PairList<Integer, VertexIdMessageCollection<I, M>>
+  public PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
   removeWorkerMessages(WorkerInfo workerInfo) {
-    PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
-        new PairList<Integer, VertexIdMessageCollection<I, M>>();
+    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> workerMessages =
+        new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
     workerMessages.initialize();
     for (Integer partitionId : workerPartitions.get(workerInfo)) {
       if (messageCache[partitionId] != null) {
@@ -139,13 +139,15 @@ public class SendMessageCache<I extends 
    * @return All vertex messages for all partitions
    */
   public PairList<WorkerInfo, PairList<
-      Integer, VertexIdMessageCollection<I, M>>> removeAllMessages() {
-    PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+      Integer, ByteArrayVertexIdMessageCollection<I, M>>> removeAllMessages() {
+    PairList<WorkerInfo, PairList<Integer,
+        ByteArrayVertexIdMessageCollection<I, M>>>
         allMessages = new PairList<WorkerInfo,
-        PairList<Integer, VertexIdMessageCollection<I, M>>>();
+        PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>();
     allMessages.initialize();
     for (WorkerInfo workerInfo : workerPartitions.keySet()) {
-      PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+      PairList<Integer, ByteArrayVertexIdMessageCollection<I,
+          M>> workerMessages =
           removeWorkerMessages(workerInfo);
       if (!workerMessages.isEmpty()) {
         allMessages.add(workerInfo, workerMessages);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java Thu Nov 15 20:17:38 2012
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.comm;
 
 import java.util.HashMap;

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java Thu Nov 15 20:17:38 2012
@@ -94,8 +94,7 @@ public class SendPartitionCache<I extend
     Partition<I, V, E, M> partition =
         ownerPartitionMap.get(partitionOwner);
     if (partition == null) {
-      partition = new Partition<I, V, E, M>(
-          configuration,
+      partition = configuration.createPartition(
           partitionOwner.getPartitionId(),
           context);
       ownerPartitionMap.put(partitionOwner, partition);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java Thu Nov 15 20:17:38 2012
@@ -18,9 +18,9 @@
 
 package org.apache.giraph.comm.aggregators;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataOutput;
 
 /**
  * Wrapper for output stream which keeps the place in the beginning for the
@@ -28,9 +28,7 @@ import java.io.IOException;
  */
 public abstract class CountingOutputStream {
   /** DataOutput to which subclasses will be writing data */
-  protected DataOutputStream dataOutput;
-  /** Byte output stream used by dataOutput */
-  private final ExtendedByteArrayOutputStream byteOutput;
+  protected ExtendedDataOutput dataOutput;
   /** Counter for objects which were written to the stream */
   private int counter;
 
@@ -38,8 +36,7 @@ public abstract class CountingOutputStre
    * Default constructor
    */
   public CountingOutputStream() {
-    byteOutput = new ExtendedByteArrayOutputStream();
-    dataOutput = new DataOutputStream(byteOutput);
+    dataOutput = new ExtendedByteArrayDataOutput();
     reset();
   }
 
@@ -56,7 +53,7 @@ public abstract class CountingOutputStre
    * @return Number of bytes
    */
   protected int getSize() {
-    return byteOutput.size();
+    return dataOutput.getPos();
   }
 
   /**
@@ -65,14 +62,9 @@ public abstract class CountingOutputStre
    * @return Number of objects followed by the data written to the stream
    */
   public byte[] flush() {
-    byteOutput.writeIntOnPosition(counter, 0);
-    try {
-      dataOutput.flush();
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "flush: IOException occurred while flushing", e);
-    }
-    byte[] ret = byteOutput.toByteArray();
+    dataOutput.writeInt(0, counter);
+    // Actual flush not required, this is a byte array
+    byte[] ret = dataOutput.toByteArray();
     reset();
     return ret;
   }
@@ -81,39 +73,13 @@ public abstract class CountingOutputStre
    * Reset the stream
    */
   private void reset() {
-    byteOutput.reset();
-    dataOutput = new DataOutputStream(byteOutput);
+    dataOutput.reset();
     // reserve the place for count to be written in the end
-    for (int i = 0; i < 4; i++) {
-      byteOutput.write(0);
+    try {
+      dataOutput.writeInt(0);
+    } catch (IOException e) {
+      throw new IllegalStateException("reset: Got IOException", e);
     }
     counter = 0;
   }
-
-  /**
-   * Subclass of {@link ByteArrayOutputStream} which provides an option to
-   * write int value over previously written data
-   */
-  public static class ExtendedByteArrayOutputStream extends
-      ByteArrayOutputStream {
-
-    /**
-     * Write integer value over previously written data at certain
-     * position in byte array
-     *
-     * @param value Value to write
-     * @param position Position from which to write
-     */
-    public void writeIntOnPosition(int value, int position) {
-      if (position + 4 > count) {
-        throw new IndexOutOfBoundsException(
-            "writeIntOnPosition: Tried to write int to position " + position +
-                " but current length is " + count);
-      }
-      buf[position] = (byte) ((value >>> 24) & 0xFF);
-      buf[position + 1] = (byte) ((value >>> 16) & 0xFF);
-      buf[position + 2] = (byte) ((value >>> 8) & 0xFF);
-      buf[position + 3] = (byte) ((value >>> 0) & 0xFF);
-    }
-  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java Thu Nov 15 20:17:38 2012
@@ -18,15 +18,9 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.VertexIdMessageCollection;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -36,6 +30,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Message store which separates data by partitions,
@@ -101,14 +99,16 @@ public class DiskBackedMessageStoreByPar
   }
 
   @Override
-  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+  public void addPartitionMessages(
+      ByteArrayVertexIdMessageCollection<I, M> messages,
       int partitionId) throws IOException {
     Map<I, Collection<M>> map = Maps.newHashMap();
-    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+    ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
+        messages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      I vertexId = iterator.getCurrentFirst();
-      M message = iterator.getCurrentSecond();
+      I vertexId = iterator.getCurrentVertexId();
+      M message = iterator.getCurrentMessage();
       Collection<M> currentMessages = map.get(vertexId);
       if (currentMessages == null) {
         currentMessages = Lists.newArrayList(message);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java Thu Nov 15 20:17:38 2012
@@ -18,15 +18,14 @@
 
 package org.apache.giraph.comm.messages;
 
-import org.apache.giraph.comm.VertexIdMessageCollection;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Message store which stores data by partition
@@ -53,7 +52,7 @@ public interface MessageStoreByPartition
    * @param partitionId Id of partition
    * @throws IOException
    */
-  void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+  void addPartitionMessages(ByteArrayVertexIdMessageCollection<I, M> messages,
       int partitionId) throws IOException;
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Thu Nov 15 20:17:38 2012
@@ -20,14 +20,12 @@ package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -37,6 +35,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+
 /**
  * Abstract class for {@link MessageStoreByPartition} which allows any kind
  * of object to hold messages for one vertex.
@@ -189,16 +189,18 @@ public abstract class SimpleMessageStore
   }
 
   @Override
-  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+  public void addPartitionMessages(ByteArrayVertexIdMessageCollection<I,
+      M> messages,
       int partitionId) throws IOException {
     ConcurrentMap<I, T> partitionMap =
         getOrCreatePartitionMap(partitionId);
 
-    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+    ByteArrayVertexIdMessageCollection<I, M>.Iterator iterator =
+        messages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      I vertexId = iterator.getCurrentFirst();
-      M message = iterator.getCurrentSecond();
+      I vertexId = iterator.getCurrentVertexId();
+      M message = iterator.getCurrentMessage();
       addVertexMessageToPartition(vertexId, message, partitionMap);
     }
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Thu Nov 15 20:17:38 2012
@@ -17,6 +17,10 @@
  */
 package org.apache.giraph.comm.netty;
 
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -24,7 +28,6 @@ import org.apache.giraph.comm.SendMessag
 import org.apache.giraph.comm.SendMutationsCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.VertexIdMessageCollection;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -43,18 +46,13 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.MetricGroup;
 import org.apache.giraph.metrics.ValueGauge;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
 /**
  * Aggregate requests and sends them to the thread-safe NettyClient.  This
  * class is not thread-safe and expected to be used and then thrown away after
@@ -150,7 +148,8 @@ public class NettyWorkerClientRequestPro
     // Send a request if the cache of outgoing message to
     // the remote worker 'workerInfo' is full enough to be flushed
     if (workerMessageCount >= maxMessagesPerWorker) {
-      PairList<Integer, VertexIdMessageCollection<I, M>> workerMessages =
+      PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
+          workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
@@ -171,8 +170,7 @@ public class NettyWorkerClientRequestPro
     }
 
     WritableRequest vertexRequest =
-        new SendVertexRequest<I, V, E, M>(partitionId,
-            partition.getVertices());
+        new SendVertexRequest<I, V, E, M>(partition);
     doRequest(workerInfo, vertexRequest);
 
     // Messages are stored separately
@@ -325,10 +323,11 @@ public class NettyWorkerClientRequestPro
     sendPartitionCache.clear();
 
     // Execute the remaining sends messages (if any)
-    PairList<WorkerInfo, PairList<Integer, VertexIdMessageCollection<I, M>>>
+    PairList<WorkerInfo, PairList<Integer,
+        ByteArrayVertexIdMessageCollection<I, M>>>
         remainingMessageCache = sendMessageCache.removeAllMessages();
     PairList<WorkerInfo,
-        PairList<Integer, VertexIdMessageCollection<I, M>>>.Iterator
+        PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>>.Iterator
         iterator = remainingMessageCache.getIterator();
     while (iterator.hasNext()) {
       iterator.next();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java Thu Nov 15 20:17:38 2012
@@ -19,17 +19,14 @@
 package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.partition.Partition;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Lists;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Collection;
 
 /**
  * Send a collection of vertices for a partition.
@@ -46,10 +43,8 @@ public class SendVertexRequest<I extends
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SendVertexRequest.class);
-  /** Partition id */
-  private int partitionId;
-  /** List of vertices to be stored on this partition */
-  private Collection<Vertex<I, V, E, M>> vertices;
+  /** Partition */
+  private Partition<I, V, E, M> partition;
 
   /**
    * Constructor used for reflection only
@@ -59,34 +54,21 @@ public class SendVertexRequest<I extends
   /**
    * Constructor for sending a request.
    *
-   * @param partitionId Partition to send the request to
-   * @param vertices Vertices to send
+   * @param partition Partition to send the request to
    */
-  public SendVertexRequest(int partitionId,
-                           Collection<Vertex<I, V, E, M>> vertices) {
-    this.partitionId = partitionId;
-    this.vertices = vertices;
+  public SendVertexRequest(Partition<I, V, E, M> partition) {
+    this.partition = partition;
   }
 
   @Override
   public void readFieldsRequest(DataInput input) throws IOException {
-    partitionId = input.readInt();
-    int verticesCount = input.readInt();
-    vertices = Lists.newArrayListWithCapacity(verticesCount);
-    for (int i = 0; i < verticesCount; ++i) {
-      Vertex<I, V, E, M> vertex = getConf().createVertex();
-      vertex.readFields(input);
-      vertices.add(vertex);
-    }
+    partition = getConf().createPartition(-1, null);
+    partition.readFields(input);
   }
 
   @Override
   public void writeRequest(DataOutput output) throws IOException {
-    output.writeInt(partitionId);
-    output.writeInt(vertices.size());
-    for (Vertex<I, V, E, M> vertex : vertices) {
-      vertex.write(output);
-    }
+    partition.write(output);
   }
 
   @Override
@@ -96,12 +78,7 @@ public class SendVertexRequest<I extends
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    if (vertices.isEmpty()) {
-      LOG.warn("doRequest: Got an empty request!");
-      return;
-    }
-    serverData.getPartitionStore().addPartitionVertices(partitionId,
-        vertices);
+    serverData.getPartitionStore().addPartition(partition);
   }
 }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java Thu Nov 15 20:17:38 2012
@@ -18,17 +18,16 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.VertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 /**
  * Send a collection of vertex messages for a partition.
  *
@@ -49,7 +48,7 @@ public class SendWorkerMessagesRequest<I
    * are owned by a single (destination) worker. These messages are all
    * destined for this worker.
    * */
-  private PairList<Integer, VertexIdMessageCollection<I, M>>
+  private PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>
   partitionVertexMessages;
 
   /**
@@ -60,10 +59,11 @@ public class SendWorkerMessagesRequest<I
   /**
    * Constructor used to send request.
    *
-   * @param partVertMsgs Map of remote partitions => VertexIdMessageCollection
+   * @param partVertMsgs Map of remote partitions =>
+   *                     ByteArrayVertexIdMessageCollection
    */
   public SendWorkerMessagesRequest(
-    PairList<Integer, VertexIdMessageCollection<I, M>> partVertMsgs) {
+    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>> partVertMsgs) {
     super();
     this.partitionVertexMessages = partVertMsgs;
   }
@@ -72,12 +72,13 @@ public class SendWorkerMessagesRequest<I
   public void readFieldsRequest(DataInput input) throws IOException {
     int numPartitions = input.readInt();
     partitionVertexMessages =
-        new PairList<Integer, VertexIdMessageCollection<I, M>>();
+        new PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>();
     partitionVertexMessages.initialize(numPartitions);
     while (numPartitions-- > 0) {
       final int partitionId = input.readInt();
-      VertexIdMessageCollection<I, M> vertexIdMessages =
-          new VertexIdMessageCollection<I, M>(getConf());
+      ByteArrayVertexIdMessageCollection<I, M> vertexIdMessages =
+          new ByteArrayVertexIdMessageCollection<I, M>();
+      vertexIdMessages.setConf(getConf());
       vertexIdMessages.readFields(input);
       partitionVertexMessages.add(partitionId, vertexIdMessages);
     }
@@ -86,8 +87,8 @@ public class SendWorkerMessagesRequest<I
   @Override
   public void writeRequest(DataOutput output) throws IOException {
     output.writeInt(partitionVertexMessages.getSize());
-    PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
-        partitionVertexMessages.getIterator();
+    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+        iterator = partitionVertexMessages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
       output.writeInt(iterator.getCurrentFirst());
@@ -102,8 +103,8 @@ public class SendWorkerMessagesRequest<I
 
   @Override
   public void doRequest(ServerData<I, V, E, M> serverData) {
-    PairList<Integer, VertexIdMessageCollection<I, M>>.Iterator iterator =
-        partitionVertexMessages.getIterator();
+    PairList<Integer, ByteArrayVertexIdMessageCollection<I, M>>.Iterator
+        iterator = partitionVertexMessages.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
       try {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Nov 15 20:17:38 2012
@@ -525,7 +525,7 @@ else[HADOOP_NON_SECURE]*/
           !getPartitionStore().hasPartition(
               partitionOwner.getPartitionId())) {
         Partition<I, V, E, M> partition =
-            new Partition<I, V, E, M>(getConfiguration(),
+            getConfiguration().createPartition(
                 partitionOwner.getPartitionId(), getContext());
         getPartitionStore().addPartition(partition);
       }
@@ -546,7 +546,7 @@ else[HADOOP_NON_SECURE]*/
         getPartitionStore().getPartitions()) {
       PartitionStats partitionStats =
           new PartitionStats(partition.getId(),
-              partition.getVertices().size(),
+              partition.getVertexCount(),
               0,
               partition.getEdgeCount(),
               0);
@@ -847,7 +847,7 @@ else[HADOOP_NON_SECURE]*/
     vertexWriter.initialize(getContext());
     for (Partition<I, V, E, M> partition :
         getPartitionStore().getPartitions()) {
-      for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+      for (Vertex<I, V, E, M> vertex : partition) {
         getContext().progress();
         vertexWriter.writeVertex(vertex);
       }
@@ -1057,10 +1057,7 @@ else[HADOOP_NON_SECURE]*/
           }
           metadataStream.close();
           Partition<I, V, E, M> partition =
-              new Partition<I, V, E, M>(
-                  getConfiguration(),
-                  partitionId,
-                  getContext());
+              getConfiguration().createPartition(partitionId, getContext());
           DataInputStream partitionsStream =
               getFs().open(new Path(partitionsFile));
           if (partitionsStream.skip(startPos) != startPos) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Thu Nov 15 20:17:38 2012
@@ -187,7 +187,7 @@ public class ComputeCallable<I extends W
         new PartitionStats(partition.getId(), 0, 0, 0, 0);
     // Make sure this is thread-safe across runs
     synchronized (partition) {
-      for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+      for (Vertex<I, V, E, M> vertex : partition) {
         // Make sure every vertex has this thread's
         // graphState before computing
         vertex.setGraphState(graphState);
@@ -205,6 +205,8 @@ public class ComputeCallable<I extends W
           } finally {
             computeOneTimerContext.stop();
           }
+          // Need to save the vertex changes (possibly)
+          partition.saveVertex(vertex);
         }
         if (vertex.isHalted()) {
           partitionStats.incrFinishedVertexCount();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Nov 15 20:17:38 2012
@@ -34,7 +34,10 @@ import java.util.Map;
 /**
  * User applications can subclass {@link EdgeListVertex}, which stores
  * the outbound edges in an ArrayList (less memory as the cost of expensive
- * random-access lookup).  Good for static graphs.
+ * random-access lookup).  Good for static graphs.  Not nearly as memory
+ * efficient as using RepresentativeVertex + ByteArrayPartition
+ * (probably about 10x more), but not bad when keeping vertices as objects in
+ * memory (SimplePartition).
  *
  * @param <I> Vertex index value
  * @param <V> Vertex value

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Nov 15 20:17:38 2012
@@ -133,6 +133,7 @@ public abstract class LongDoubleFloatDou
     long id = in.readLong();
     double value = in.readDouble();
     initialize(new LongWritable(id), new DoubleWritable(value));
+    edgeMap.clear();
     long edgeMapSize = in.readLong();
     for (long i = 0; i < edgeMapSize; ++i) {
       long targetVertexId = in.readLong();

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * This vertex should only be used in conjunction with ByteArrayPartition since
+ * it has special code to deserialize by reusing objects and not instantiating
+ * new ones.  If used without ByteArrayPartition, it will cause a lot of
+ * wasted memory.
+ *
+ * Also, this vertex is optimized for space and not efficient for either adding
+ * or random access of edges.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class RepresentativeVertex<
+    I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(RepresentativeVertex.class);
+  /** Representative edge */
+  private final Edge<I, E> representativeEdge = new Edge<I, E>();
+  /** Serialized edges */
+  private byte[] serializedEdges;
+  /** Byte used in serializedEdges */
+  private int serializedEdgesBytesUsed;
+  /** Number of edges */
+  private int edgeCount;
+
+  @Override
+  public final void initialize(I id, V value, Map<I, E> edges) {
+    // Make sure the initial values exist
+    representativeEdge.setTargetVertexId(getConf().createVertexId());
+    representativeEdge.setValue(getConf().createEdgeValue());
+    super.initialize(id, value, edges);
+  }
+
+  @Override
+  public final void initialize(I id, V value) {
+    // Make sure the initial values exist
+    representativeEdge.setTargetVertexId(getConf().createVertexId());
+    representativeEdge.setValue(getConf().createEdgeValue());
+    super.initialize(id, value);
+  }
+
+  /**
+   * Iterator that uses the representative edge (only one iterator allowed
+   * at a time)
+   */
+  private final class RepresentativeEdgeIterator implements
+      Iterator<Edge<I, E>> {
+    /** Input for processing the bytes */
+    private final ExtendedDataInput extendedDataInput;
+
+    /** Constructor. */
+    RepresentativeEdgeIterator() {
+      extendedDataInput = getConf().createExtendedDataInput(
+          serializedEdges, 0, serializedEdgesBytesUsed);
+    }
+    @Override
+    public boolean hasNext() {
+      return serializedEdges != null && extendedDataInput.available() > 0;
+    }
+
+    @Override
+    public Edge<I, E> next() {
+      try {
+        representativeEdge.getTargetVertexId().readFields(extendedDataInput);
+        representativeEdge.getValue().readFields(extendedDataInput);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: Failed on pos " +
+            extendedDataInput.getPos() + " edge " + representativeEdge);
+      }
+      return representativeEdge;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: Not supported");
+    }
+  }
+
+  @Override
+  public final Iterator<Edge<I, E>> iterator() {
+    return new RepresentativeEdgeIterator();
+  }
+
+  @Override
+  public final void setEdges(Map<I, E> edges) {
+    ExtendedDataOutput extendedOutputStream =
+        getConf().createExtendedDataOutput();
+    if (edges != null) {
+      for (Map.Entry<I, E> edge : edges.entrySet()) {
+        try {
+          edge.getKey().write(extendedOutputStream);
+          edge.getValue().write(extendedOutputStream);
+        } catch (IOException e) {
+          throw new IllegalStateException("setEdges: Failed to serialize " +
+              edge);
+        }
+        ++edgeCount;
+      }
+    }
+    serializedEdges = extendedOutputStream.getByteArray();
+    serializedEdgesBytesUsed = extendedOutputStream.getPos();
+  }
+
+  @Override
+  public final Iterable<Edge<I, E>> getEdges() {
+    return this;
+  }
+
+  @Override
+  public final boolean addEdge(I targetVertexId, E value) {
+    // Note that this is very expensive (deserializes all edges
+    // in an addEdge() request).
+    // Hopefully the user set all the edges in setEdges().
+    for (Edge<I, E> edge : getEdges()) {
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        LOG.warn("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            targetVertexId);
+        return false;
+      }
+    }
+    ExtendedDataOutput extendedDataOutput =
+        getConf().createExtendedDataOutput(
+            serializedEdges, serializedEdgesBytesUsed);
+    try {
+      targetVertexId.write(extendedDataOutput);
+      value.write(extendedDataOutput);
+    } catch (IOException e) {
+      throw new IllegalStateException("addEdge: Failed to write to the " +
+          "new byte array");
+    }
+    serializedEdges = extendedDataOutput.getByteArray();
+    serializedEdgesBytesUsed = extendedDataOutput.getPos();
+    ++edgeCount;
+    return true;
+  }
+
+  @Override
+  public final int getNumEdges() {
+    return edgeCount;
+  }
+
+  @Override
+  public final E removeEdge(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges
+    // in an removedge() request).
+    // Hopefully the user set all the edges correctly in setEdges().
+    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
+    int foundStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
+            serializedEdges, foundStartOffset,
+            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
+        serializedEdgesBytesUsed -=
+            iterator.extendedDataInput.getPos() - foundStartOffset;
+        --edgeCount;
+        return edge.getValue();
+      }
+      foundStartOffset = iterator.extendedDataInput.getPos();
+    }
+
+    return null;
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    // Ensure these objects are present
+    if (representativeEdge.getTargetVertexId() == null) {
+      representativeEdge.setTargetVertexId(getConf().createVertexId());
+    }
+
+    if (representativeEdge.getValue() == null) {
+      representativeEdge.setValue(getConf().createEdgeValue());
+    }
+
+    I vertexId = getId();
+    if (vertexId == null) {
+      vertexId = getConf().createVertexId();
+    }
+    vertexId.readFields(in);
+
+    V vertexValue = getValue();
+    if (vertexValue == null) {
+      vertexValue = getConf().createVertexValue();
+    }
+    vertexValue.readFields(in);
+
+    initialize(vertexId, vertexValue);
+
+    serializedEdgesBytesUsed = in.readInt();
+    // Only create a new buffer if the old one isn't big enough
+    if (serializedEdges == null ||
+        serializedEdgesBytesUsed > serializedEdges.length) {
+      serializedEdges = new byte[serializedEdgesBytesUsed];
+    }
+    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+    edgeCount = in.readInt();
+
+    boolean halt = in.readBoolean();
+    if (halt) {
+      voteToHalt();
+    } else {
+      wakeUp();
+    }
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(serializedEdgesBytesUsed);
+    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+    out.writeInt(edgeCount);
+
+    out.writeBoolean(isHalted());
+  }
+}
+

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java?rev=1409973&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/ByteArrayPartition.java Thu Nov 15 20:17:38 2012
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph.partition;
+
+import com.google.common.collect.MapMaker;
+
+import com.google.common.primitives.Ints;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * Byte array based partition.  Should reduce the amount of memory used since
+ * the entire graph is compressed into byte arrays.  Must guarantee, however,
+ * that only one thread at a time will call getVertex since it is a singleton.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ByteArrayPartition<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Partition<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
+  /** Configuration from the worker */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Partition id */
+  private int id;
+  /**
+   * Vertex map for this range (keyed by index).  Note that the byte[] is a
+   * serialized vertex with the first four bytes as the length of the vertex
+   * to read.
+   */
+  private ConcurrentMap<I, byte[]> vertexMap;
+  /** Context used to report progress */
+  private Progressable progressable;
+  /** Representative vertex */
+  private Vertex<I, V, E, M> representativeVertex;
+  /** Use unsafe serialization */
+  private boolean useUnsafeSerialization;
+
+  /**
+   * Constructor for reflection.
+   */
+  public ByteArrayPartition() { }
+
+  @Override
+  public void initialize(int partitionId, Progressable progressable) {
+    setId(partitionId);
+    setProgressable(progressable);
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
+    byte[] vertexData = vertexMap.get(vertexIndex);
+    if (vertexData == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(
+        vertexData, representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
+    byte[] vertexData =
+        WritableUtils.writeToByteArrayWithSize(vertex, useUnsafeSerialization);
+    byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
+    if (oldVertexBytes == null) {
+      return null;
+    } else {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          oldVertexBytes, representativeVertex, useUnsafeSerialization);
+      return representativeVertex;
+    }
+  }
+
+  @Override
+  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
+    byte[] vertexBytes = vertexMap.remove(vertexIndex);
+    if (vertexBytes == null) {
+      return null;
+    }
+    WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+        representativeVertex, useUnsafeSerialization);
+    return representativeVertex;
+  }
+
+  @Override
+  public void addPartition(Partition<I, V, E, M> partition) {
+    // Only work with other ByteArrayPartition instances
+    if (!(partition instanceof ByteArrayPartition)) {
+      throw new IllegalStateException("addPartition: Cannot add partition " +
+          "of type " + partition.getClass());
+    }
+
+    ByteArrayPartition<I, V, E, M> byteArrayPartition =
+        (ByteArrayPartition<I, V, E, M>) partition;
+    for (Map.Entry<I, byte[]> entry :
+        byteArrayPartition.vertexMap.entrySet()) {
+      vertexMap.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public long getVertexCount() {
+    return vertexMap.size();
+  }
+
+  @Override
+  public long getEdgeCount() {
+    long edges = 0;
+    for (byte[] vertexBytes : vertexMap.values()) {
+      WritableUtils.readFieldsFromByteArrayWithSize(vertexBytes,
+          representativeVertex, useUnsafeSerialization);
+      edges += representativeVertex.getNumEdges();
+    }
+    return edges;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  @Override
+  public void setProgressable(Progressable progressable) {
+    this.progressable = progressable;
+  }
+
+  @Override
+  public void saveVertex(Vertex<I, V, E, M> vertex) {
+    // Reuse the old buffer whenever possible
+    byte[] oldVertexData = vertexMap.get(vertex.getId());
+    if (oldVertexData != null) {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, oldVertexData, useUnsafeSerialization));
+    } else {
+      vertexMap.put(vertex.getId(),
+          WritableUtils.writeToByteArrayWithSize(
+              vertex, useUnsafeSerialization));
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(id);
+    output.writeInt(vertexMap.size());
+    for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      entry.getKey().write(output);
+      // Note here that we are writing the size of the vertex data first
+      // as it is encoded in the first four bytes of the byte[]
+      int vertexDataSize;
+      if (useUnsafeSerialization) {
+        vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
+            0);
+      } else {
+        vertexDataSize = Ints.fromByteArray(entry.getValue());
+      }
+
+      output.writeInt(vertexDataSize);
+      output.write(entry.getValue(), 0, vertexDataSize);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    id = input.readInt();
+    int size = input.readInt();
+    vertexMap = new MapMaker().concurrencyLevel(
+        conf.getNettyServerExecutionConcurrency()).initialCapacity(
+        size).makeMap();
+    representativeVertex = conf.createVertex();
+    useUnsafeSerialization = conf.useUnsafeSerialization();
+    for (int i = 0; i < size; ++i) {
+      if (progressable != null) {
+        progressable.progress();
+      }
+      I vertexId = conf.createVertexId();
+      vertexId.readFields(input);
+      int vertexDataSize = input.readInt();
+      byte[] vertexData = new byte[vertexDataSize];
+      input.readFully(vertexData);
+      if (vertexMap.put(vertexId, vertexData) != null) {
+        throw new IllegalStateException("readFields: Already saw vertex " +
+            vertexId);
+      }
+    }
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<Vertex<I, V, E, M>> iterator() {
+    return new RepresentativeVertexIterator();
+  }
+
+  /**
+   * Iterator that deserializes a vertex from a byte array on the fly, using
+   * the same representative vertex object.
+   */
+  private class RepresentativeVertexIterator implements
+      Iterator<Vertex<I, V, E, M>> {
+    /** Iterator to the vertex values */
+    private Iterator<byte[]> vertexDataIterator =
+        vertexMap.values().iterator();
+
+    @Override
+    public boolean hasNext() {
+      return vertexDataIterator.hasNext();
+    }
+
+    @Override
+    public Vertex<I, V, E, M> next() {
+      WritableUtils.readFieldsFromByteArrayWithSize(
+          vertexDataIterator.next(), representativeVertex,
+          useUnsafeSerialization);
+      return representativeVertex;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: This method is not supported.");
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Thu Nov 15 20:17:38 2012
@@ -37,7 +37,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
@@ -146,7 +145,7 @@ public class DiskBackedPartitionStore<I 
     file.createNewFile();
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file)));
-    for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+    for (Vertex<I, V, E, M> vertex : partition) {
       vertex.write(outputStream);
     }
     outputStream.close();
@@ -161,8 +160,8 @@ public class DiskBackedPartitionStore<I 
    */
   private Partition<I, V, E, M> readPartition(Integer partitionId)
     throws IOException {
-    Partition<I, V, E, M> partition = new Partition<I, V, E, M>(conf,
-        partitionId, context);
+    Partition<I, V, E, M> partition =
+        conf.createPartition(partitionId, context);
     File file = new File(getPartitionPath(partitionId));
     DataInputStream inputStream = new DataInputStream(
         new BufferedInputStream(new FileInputStream(file)));
@@ -178,19 +177,17 @@ public class DiskBackedPartitionStore<I 
   }
 
   /**
-   * Append some vertices to an out-of-core partition.
+   * Append some vertices of another partition to an out-of-core partition.
    *
-   * @param partitionId Id of the destination partition
-   * @param vertices Vertices to be added
+   * @param partition Partition to add
    * @throws IOException
    */
-  private void appendVertices(Integer partitionId,
-                              Collection<Vertex<I, V, E, M>> vertices)
+  private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
     throws IOException {
-    File file = new File(getPartitionPath(partitionId));
+    File file = new File(getPartitionPath(partition.getId()));
     DataOutputStream outputStream = new DataOutputStream(
         new BufferedOutputStream(new FileOutputStream(file, true)));
-    for (Vertex<I, V, E, M> vertex : vertices) {
+    for (Vertex<I, V, E, M> vertex : partition) {
       vertex.write(outputStream);
     }
     outputStream.close();
@@ -208,12 +205,12 @@ public class DiskBackedPartitionStore<I 
       }
       if (LOG.isInfoEnabled()) {
         LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
-            " out of core");
+            " out of core with size " + loadedPartition.getVertexCount());
       }
       try {
         writePartition(loadedPartition);
         onDiskPartitions.put(loadedPartition.getId(),
-            loadedPartition.getVertices().size());
+            (int) loadedPartition.getVertexCount());
         loadedPartition = null;
       } catch (IOException e) {
         throw new IllegalStateException("loadPartition: failed writing " +
@@ -247,7 +244,8 @@ public class DiskBackedPartitionStore<I 
     }
     try {
       writePartition(partition);
-      onDiskPartitions.put(partition.getId(), partition.getVertices().size());
+      onDiskPartitions.put(partition.getId(),
+          (int) partition.getVertexCount());
     } catch (IOException e) {
       throw new IllegalStateException("addPartition: failed writing " +
           "partition " + partition.getId() + "to disk");
@@ -256,51 +254,42 @@ public class DiskBackedPartitionStore<I 
 
   @Override
   public void addPartition(Partition<I, V, E, M> partition) {
-    Lock lock = createLock(partition.getId());
-    if (lock == null) {
-      throw new IllegalStateException("addPartition: partition " +
-          partition.getId() + " already exists");
-    }
-    addPartitionNoLock(partition);
-    lock.unlock();
-  }
-
-  @Override
-  public void addPartitionVertices(Integer partitionId,
-                                   Collection<Vertex<I, V, E, M>> vertices) {
-    if (inMemoryPartitions.containsKey(partitionId)) {
-      Partition<I, V, E, M> partition = inMemoryPartitions.get(partitionId);
-      partition.putVertices(vertices);
-    } else if (onDiskPartitions.containsKey(partitionId)) {
-      Lock lock = getLock(partitionId);
+    if (inMemoryPartitions.containsKey(partition.getId())) {
+      Partition<I, V, E, M> existingPartition =
+          inMemoryPartitions.get(partition.getId());
+      existingPartition.addPartition(partition);
+    } else if (onDiskPartitions.containsKey(partition.getId())) {
+      Lock lock = getLock(partition.getId());
       lock.lock();
-      if (loadedPartition != null && loadedPartition.getId() == partitionId) {
-        loadedPartition.putVertices(vertices);
+      if (loadedPartition != null && loadedPartition.getId() ==
+          partition.getId()) {
+        loadedPartition.addPartition(partition);
       } else {
         try {
-          appendVertices(partitionId, vertices);
-          onDiskPartitions.put(partitionId,
-              onDiskPartitions.get(partitionId) + vertices.size());
+          appendPartitionOutOfCore(partition);
+          onDiskPartitions.put(partition.getId(),
+              onDiskPartitions.get(partition.getId()) +
+                  (int) partition.getVertexCount());
         } catch (IOException e) {
-          throw new IllegalStateException("addPartitionVertices: failed " +
-              "writing vertices to partition " + partitionId + " on disk", e);
+          throw new IllegalStateException("addPartition: failed " +
+              "writing vertices to partition " + partition.getId() + " on disk",
+              e);
         }
       }
       lock.unlock();
     } else {
-      Lock lock = createLock(partitionId);
+      Lock lock = createLock(partition.getId());
       if (lock != null) {
-        addPartitionNoLock(
-            new Partition<I, V, E, M>(conf, partitionId, context));
+        addPartitionNoLock(partition);
         lock.unlock();
       } else {
         // Another thread is already creating the partition,
         // so we make sure it's done before repeating the call.
-        lock = getLock(partitionId);
+        lock = getLock(partition.getId());
         lock.lock();
         lock.unlock();
+        addPartition(partition);
       }
-      addPartitionVertices(partitionId, vertices);
     }
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/Partition.java Thu Nov 15 20:17:38 2012
@@ -18,21 +18,12 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.giraph.GiraphConfiguration;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.util.Progressable;
 
 /**
  * A generic container that stores vertices.  Vertex ids will map to exactly
@@ -44,38 +35,17 @@ import java.util.concurrent.ConcurrentSk
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class Partition<I extends WritableComparable,
+public interface Partition<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Writable {
-  /** Configuration from the worker */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-  /** Partition id */
-  private final int id;
-  /** Vertex map for this range (keyed by index) */
-  private final ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
-  /** Context used to report progress */
-  private final Mapper<?, ?, ?, ?>.Context context;
-
-  /**
-   * Constructor.
-   *
-   * @param conf Configuration.
-   * @param id Partition id.
-   * @param context Mapper context
-   */
-  public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
-                   int id,
-                   Mapper<?, ?, ?, ?>.Context context) {
-    this.conf = conf;
-    this.id = id;
-    this.context = context;
-    if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
-        GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
-      vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
-    } else {
-      vertexMap = Maps.newConcurrentMap();
-    }
-  }
+    extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
+    Iterable<Vertex<I, V, E, M>> {
+  /**
+   * Initialize the partition.  Guaranteed to be called before used.
+   *
+   * @param partitionId Partition id
+   * @param progressable Progressable to call progress
+   */
+  void initialize(int partitionId, Progressable progressable);
 
   /**
    * Get the vertex for this vertex index.
@@ -83,9 +53,7 @@ public class Partition<I extends Writabl
    * @param vertexIndex Vertex index to search for
    * @return Vertex if it exists, null otherwise
    */
-  public Vertex<I, V, E, M> getVertex(I vertexIndex) {
-    return vertexMap.get(vertexIndex);
-  }
+  Vertex<I, V, E, M> getVertex(I vertexIndex);
 
   /**
    * Put a vertex into the Partition
@@ -93,9 +61,7 @@ public class Partition<I extends Writabl
    * @param vertex Vertex to put in the Partition
    * @return old vertex value (i.e. null if none existed prior)
    */
-  public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
-    return vertexMap.put(vertex.getId(), vertex);
-  }
+  Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
 
   /**
    * Remove a vertex from the Partition
@@ -103,79 +69,54 @@ public class Partition<I extends Writabl
    * @param vertexIndex Vertex index to remove
    * @return The removed vertex.
    */
-  public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
-    return vertexMap.remove(vertexIndex);
-  }
+  Vertex<I, V, E, M> removeVertex(I vertexIndex);
 
   /**
-   * Get a collection of the vertices.
+   * Add a partition's vertices
    *
-   * @return Collection of the vertices
+   * @param partition Partition to add
    */
-  public Collection<Vertex<I, V, E , M>> getVertices() {
-    return vertexMap.values();
-  }
+  void addPartition(Partition<I, V, E, M> partition);
 
   /**
-   * Put several vertices in the partition.
+   * Get the number of vertices in this partition
    *
-   * @param vertices Vertices to add
+   * @return Number of vertices
    */
-  public void putVertices(Collection<Vertex<I, V, E , M>> vertices) {
-    for (Vertex<I, V, E , M> vertex : vertices) {
-      vertexMap.put(vertex.getId(), vertex);
-    }
-  }
+  long getVertexCount();
 
   /**
-   * Get the number of edges in this partition.  Computed on the fly.
+   * Get the number of edges in this partition.
    *
    * @return Number of edges.
    */
-  public long getEdgeCount() {
-    long edges = 0;
-    for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
-      edges += vertex.getNumEdges();
-    }
-    return edges;
-  }
+  long getEdgeCount();
 
   /**
    * Get the partition id.
    *
    * @return Id of this partition.
    */
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public String toString() {
-    return "(id=" + getId() + ",V=" + vertexMap.size() +
-        ",E=" + getEdgeCount() + ")";
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    int vertices = input.readInt();
-    for (int i = 0; i < vertices; ++i) {
-      Vertex<I, V, E, M> vertex = conf.createVertex();
-      context.progress();
-      vertex.readFields(input);
-      if (vertexMap.put(vertex.getId(), vertex) != null) {
-        throw new IllegalStateException(
-            "readFields: " + this +
-            " already has same id " + vertex);
-      }
-    }
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeInt(vertexMap.size());
-    for (Vertex vertex : vertexMap.values()) {
-      context.progress();
-      vertex.write(output);
-    }
-  }
+  int getId();
+
+  /**
+   * Set the partition id.
+   *
+   * @param id Id of this partition
+   */
+  void setId(int id);
+
+  /**
+   * Set the context.
+   *
+   * @param progressable Progressable
+   */
+  void setProgressable(Progressable progressable);
+
+  /**
+   * Save potentially modified vertex back to the partition.
+   *
+   * @param vertex Vertex to save
+   */
+  void saveVertex(Vertex<I, V, E, M> vertex);
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1409973&r1=1409972&r2=1409973&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Thu Nov 15 20:17:38 2012
@@ -18,14 +18,10 @@
 
 package org.apache.giraph.graph.partition;
 
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-
-import java.util.Collection;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * Structure that stores partitions for a worker.
@@ -39,22 +35,14 @@ public abstract class PartitionStore<I e
     V extends Writable, E extends Writable, M extends Writable> {
 
   /**
-   * Add a new partition to the store.
+   * Add a new partition to the store or just the vertices from the partition
+   * to the old partition.
    *
-   * @param partition Partition
+   * @param partition Partition to add
    */
   public abstract void addPartition(Partition<I, V, E, M> partition);
 
   /**
-   * Add some vertices to a (possibly existing) partition.
-   *
-   * @param partitionId Id of the destination partition
-   * @param vertices Vertices
-   */
-  public abstract void addPartitionVertices(
-      Integer partitionId, Collection<Vertex<I, V, E, M>> vertices);
-
-  /**
    * Get a partition.
    *
    * @param partitionId Partition id