You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/05/09 20:47:34 UTC

svn commit: r1336344 [2/2] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ src/test/java/org/apache/giraph/comm/ s...

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java Wed May  9 18:47:32 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Send a collection of vertex mutations for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SendPartitionMutationsRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable,
+    M extends Writable> implements WritableRequest<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendPartitionMutationsRequest.class);
+  /** Partition id */
+  private int partitionId;
+  /** Mutations sent for a partition */
+  private Map<I, VertexMutations<I, V, E, M>> vertexIdMutations;
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendPartitionMutationsRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param partitionId Partition to send the request to
+   * @param vertexIdMutations Map of mutations to send
+   */
+  public SendPartitionMutationsRequest(int partitionId,
+      Map<I, VertexMutations<I, V, E, M>> vertexIdMutations) {
+    this.partitionId = partitionId;
+    this.vertexIdMutations = vertexIdMutations;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    int vertexIdMutationsSize = input.readInt();
+    vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
+    for (int i = 0; i < vertexIdMutationsSize; ++i) {
+      I vertexId = BspUtils.createVertexIndex(conf);
+      vertexId.readFields(input);
+      VertexMutations<I, V, E, M> vertexMutations =
+          new VertexMutations<I, V, E, M>();
+      vertexMutations.setConf(conf);
+      vertexMutations.readFields(input);
+      if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
+        throw new IllegalStateException(
+            "readFields: Already has vertex id " + vertexId);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeInt(vertexIdMutations.size());
+    for (Entry<I, VertexMutations<I, V, E, M>> entry :
+        vertexIdMutations.entrySet()) {
+      entry.getKey().write(output);
+      entry.getValue().write(output);
+    }
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SEND_PARTITION_MUTATIONS_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E, M> serverData) {
+    ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
+      serverData.getVertexMutations();
+    for (Entry<I, VertexMutations<I, V, E, M>> entry :
+        vertexIdMutations.entrySet()) {
+      VertexMutations<I, V, E, M> mutations =
+          vertexMutations.get(entry.getKey());
+      if (mutations == null) {
+        mutations = vertexMutations.putIfAbsent(
+            entry.getKey(), entry.getValue());
+        if (mutations == null) {
+          continue;
+        }
+      }
+      synchronized (mutations) {
+        mutations.addVertexMutations(entry.getValue());
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java Wed May  9 18:47:32 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Send a collection of vertices for a partition.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class SendVertexRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable,
+    M extends Writable> implements WritableRequest<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SendVertexRequest.class);
+  /** Partition id */
+  private int partitionId;
+  /** List of vertices to be stored on this partition */
+  private Collection<BasicVertex<I, V, E, M>> vertices;
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Constructor used for reflection only
+   */
+  public SendVertexRequest() { }
+
+  /**
+   * Constructor for sending a request.
+   *
+   * @param partitionId Partition to send the request to
+   * @param vertices Vertices to send
+   */
+  public SendVertexRequest(
+      int partitionId, Collection<BasicVertex<I, V, E, M>> vertices) {
+    this.partitionId = partitionId;
+    this.vertices = vertices;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    partitionId = input.readInt();
+    int verticesCount = input.readInt();
+    vertices = Lists.newArrayListWithCapacity(verticesCount);
+    for (int i = 0; i < verticesCount; ++i) {
+      BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+      vertex.readFields(input);
+      vertices.add(vertex);
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeInt(vertices.size());
+    for (BasicVertex<I, V, E, M> vertex : vertices) {
+      vertex.write(output);
+    }
+  }
+
+  @Override
+  public Type getType() {
+    return Type.SEND_VERTEX_REQUEST;
+  }
+
+  @Override
+  public void doRequest(ServerData<I, V, E, M> serverData) {
+    ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+    partitionVertexMap = serverData.getPartitionVertexMap();
+    if (vertices.isEmpty()) {
+      LOG.warn("doRequest: Got an empty request!");
+      return;
+    }
+    Collection<BasicVertex<I, V, E, M>> vertexMap =
+        partitionVertexMap.get(partitionId);
+    if (vertexMap == null) {
+      vertexMap = partitionVertexMap.putIfAbsent(partitionId, vertices);
+      if (vertexMap == null) {
+        return;
+      }
+    }
+    synchronized (vertexMap) {
+      vertexMap.addAll(vertices);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}
+

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java Wed May  9 18:47:32 2012
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Anything that the server stores
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class ServerData<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Map of partition ids to incoming vertices from other workers.
+   * (Synchronized on values)
+   */
+  private final ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+  inPartitionVertexMap =
+      new ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>();
+  /**
+   * Map of inbound messages, mapping from vertex index to list of messages.
+   * Transferred to inMessages at beginning of a superstep.  This
+   * intermediary step exists so that the combiner will run not only at the
+   * client, but also at the server. Also, allows the sending of large
+   * message lists during the superstep computation. (Synchronized on values)
+   */
+  private final ConcurrentHashMap<I, Collection<M>> transientMessages =
+      new ConcurrentHashMap<I, Collection<M>>();
+  /**
+   * Map of partition ids to incoming vertex mutations from other workers.
+   * (Synchronized access to values)
+   */
+  private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+  vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
+
+  /**
+   * Get the partition vertices (synchronize on the values)
+   *
+   * @return Partition vertices
+   */
+  public ConcurrentHashMap<Integer, Collection<BasicVertex<I, V, E, M>>>
+  getPartitionVertexMap() {
+    return inPartitionVertexMap;
+  }
+
+  /**
+   * Get the vertex messages (synchronize on the values)
+   *
+   * @return Vertex messages
+   */
+  public ConcurrentHashMap<I, Collection<M>> getTransientMessages() {
+    return transientMessages;
+  }
+
+  /**
+   * Get the vertex mutations (synchronize on the values)
+   *
+   * @return Vertex mutations
+   */
+  public ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
+  getVertexMutations() {
+    return vertexMutations;
+  }
+}

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed May  9 18:47:32 2012
@@ -27,8 +27,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 /**
  * Public interface for workers to do message communication
@@ -39,9 +37,14 @@ import java.util.Map;
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface WorkerCommunications<I extends WritableComparable,
+public interface WorkerClient<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> {
   /**
+   *  Setup the client.
+   */
+  void setup();
+
+  /**
    * Fix changes to the workers and the mapping between partitions and
    * workers.
    */
@@ -100,11 +103,24 @@ public interface WorkerCommunications<I 
   void removeVertexReq(I vertexIndex) throws IOException;
 
   /**
-   * Get the vertices that were sent in the last iteration.  After getting
-   * the map, the user should synchronize with it to insure it
-   * is thread-safe.
+   * Flush all outgoing messages.  This will synchronously ensure that all
+   * messages have been send and delivered prior to returning.
    *
-   * @return map of vertex ranges to vertices
+   * @throws IOException
+   */
+  void flush() throws IOException;
+
+  /**
+   * Get the messages sent during this superstep and clear them.
+   *
+   * @return Number of messages sent before the reset.
+   */
+  long resetMessageCount();
+
+  /**
+   * Closes all connections.
+   *
+   * @throws IOException
    */
-  Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
+  void closeConnections() throws IOException;
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java Wed May  9 18:47:32 2012
@@ -18,55 +18,19 @@
 
 package org.apache.giraph.comm;
 
-import java.io.Closeable;
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 
 /**
- * Interface for message communication server.
+ * Interface for both the client and the server
  *
  * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param <V> Vertex data
+ * @param <E> Edge data
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface ServerInterface<I extends WritableComparable,
+public interface WorkerClientServer<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends Closeable, WorkerCommunications<I, V, E, M> {
-  /**
-   *  Setup the server.
-   */
-  void setup();
-
-  /**
-   * Move the in transition messages to the in messages for every vertex and
-   * add new connections to any newly appearing RPC proxies.
-   */
-  void prepareSuperstep();
-
-  /**
-   * Flush all outgoing messages.  This will synchronously ensure that all
-   * messages have been send and delivered prior to returning.
-   *
-   * @param context Context used to signal process
-   * @return Number of messages sent during the last superstep
-   * @throws IOException
-   */
-  long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
-
-  /**
-   * Closes all connections.
-   *
-   * @throws IOException
-   */
-  void closeConnections() throws IOException;
-
-  /**
-   * Shuts down.
-   */
-  void close();
+    extends WorkerClient<I, V, E, M>, WorkerServer<I, V, E, M> {
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (from r1332888, incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java&r1=1332888&r2=1336344&rev=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Wed May  9 18:47:32 2012
@@ -19,11 +19,12 @@
 package org.apache.giraph.comm;
 
 import java.io.Closeable;
-import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
 
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
 
 /**
  * Interface for message communication server.
@@ -34,13 +35,15 @@ import org.apache.hadoop.mapreduce.Mappe
  * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public interface ServerInterface<I extends WritableComparable,
+public interface WorkerServer<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    extends Closeable, WorkerCommunications<I, V, E, M> {
+    extends Closeable {
   /**
-   *  Setup the server.
+   * Get the port
+   *
+   * @return Port used by this server
    */
-  void setup();
+  int getPort();
 
   /**
    * Move the in transition messages to the in messages for every vertex and
@@ -49,21 +52,13 @@ public interface ServerInterface<I exten
   void prepareSuperstep();
 
   /**
-   * Flush all outgoing messages.  This will synchronously ensure that all
-   * messages have been send and delivered prior to returning.
-   *
-   * @param context Context used to signal process
-   * @return Number of messages sent during the last superstep
-   * @throws IOException
-   */
-  long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
-
-  /**
-   * Closes all connections.
+   * Get the vertices that were sent in the last iteration.  After getting
+   * the map, the user should synchronize with it to insure it
+   * is thread-safe.
    *
-   * @throws IOException
+   * @return map of vertex ranges to vertices
    */
-  void closeConnections() throws IOException;
+  Map<Integer, Collection<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
 
   /**
    * Shuts down.

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java Wed May  9 18:47:32 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.giraph.comm.RequestRegistry.Type;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for requests to implement
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public interface WritableRequest<I extends WritableComparable,
+    V extends Writable, E extends Writable,
+    M extends Writable> extends Writable, Configurable {
+  /**
+   * Get the type of the request
+   *
+   * @return Request type
+   */
+  Type getType();
+  /**
+   * Execute the request
+   *
+   * @param serverData Accessible data that can be mutated per the request
+   */
+  void doRequest(ServerData<I, V, E, M> serverData);
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed May  9 18:47:32 2012
@@ -267,7 +267,7 @@ public abstract class BspService<I exten
   /** File system */
   private final FileSystem fs;
   /** Checkpoint frequency */
-  private int checkpointFrequency = -1;
+  private final int checkpointFrequency;
   /** Map of aggregators */
   private Map<String, Aggregator<Writable>> aggregatorMap =
       new TreeMap<String, Aggregator<Writable>>();

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed May  9 18:47:32 2012
@@ -22,8 +22,10 @@ import net.iharder.Base64;
 
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.NettyWorkerClientServer;
 import org.apache.giraph.comm.RPCCommunications;
-import org.apache.giraph.comm.ServerInterface;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
@@ -99,7 +101,7 @@ public class BspServiceWorker<I extends 
   private final Map<PartitionOwner, Partition<I, V, E, M>>
   inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
   /** Communication service */
-  private final ServerInterface<I, V, E, M> commService;
+  private final WorkerClientServer<I, V, E, M> commService;
   /** Structure to store the partitions on this worker */
   private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
       new HashMap<Integer, Partition<I, V, E, M>>();
@@ -148,11 +150,22 @@ public class BspServiceWorker<I extends 
             GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    RPCCommunications<I, V, E, M> rpcCommService =
-        new RPCCommunications<I, V, E, M>(context, this, graphState);
+    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+        GiraphJob.USE_NETTY_DEFAULT);
+    if (useNetty) {
+      commService =  new NettyWorkerClientServer<I, V, E, M>(context, this);
+    } else {
+      commService =
+          new RPCCommunications<I, V, E, M>(context, this, graphState);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("BspServiceWorker: maxVerticesPerPartition = " +
+          maxVerticesPerPartition + " useNetty = " + useNetty);
+    }
+
     workerInfo = new WorkerInfo(
-        getHostname(), getTaskPartition(), rpcCommService.getPort());
-    commService = rpcCommService;
+        getHostname(), getTaskPartition(), commService.getPort());
+
     graphState.setWorkerCommunications(commService);
     this.workerContext =
         BspUtils.createWorkerContext(getConfiguration(),
@@ -311,6 +324,7 @@ public class BspServiceWorker<I extends 
         entry.getValue().getVertices().clear();
       }
     }
+    commService.flush();
     inputSplitCache.clear();
 
     return vertexEdgeCount;
@@ -954,7 +968,8 @@ public class BspServiceWorker<I extends 
     // 6. Wait for the master's global stats, and check if done
     long workerSentMessages = 0;
     try {
-      workerSentMessages = commService.flush(getContext());
+      commService.flush();
+      workerSentMessages = commService.resetMessageCount();
     } catch (IOException e) {
       throw new IllegalStateException(
           "finishSuperstep: flush failed", e);
@@ -1310,6 +1325,11 @@ public class BspServiceWorker<I extends 
       }
     }
 
+    try {
+      getGraphMapper().getGraphState().getWorkerCommunications().flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+    }
     String myPartitionExchangeDonePath =
         getPartitionExchangeWorkerPath(
             getApplicationAttempt(), getSuperstep(), getWorkerInfo());
@@ -1410,11 +1430,11 @@ public class BspServiceWorker<I extends 
    *        temporarily stored.
    */
   private void movePartitionsToWorker(
-      ServerInterface<I, V, E, M> commService) {
-    Map<Integer, List<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+      WorkerServer<I, V, E, M> commService) {
+    Map<Integer, Collection<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
         commService.getInPartitionVertexMap();
     synchronized (inPartitionVertexMap) {
-      for (Entry<Integer, List<BasicVertex<I, V, E, M>>> entry :
+      for (Entry<Integer, Collection<BasicVertex<I, V, E, M>>> entry :
         inPartitionVertexMap.entrySet()) {
         if (getPartitionMap().containsKey(entry.getKey())) {
           throw new IllegalStateException(
@@ -1427,21 +1447,23 @@ public class BspServiceWorker<I extends 
         Partition<I, V, E, M> tmpPartition =
             new Partition<I, V, E, M>(getConfiguration(),
                 entry.getKey());
-        for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
-          if (tmpPartition.putVertex(vertex) != null) {
-            throw new IllegalStateException(
-                "moveVerticesToWorker: Vertex " + vertex +
-                " already exists!");
+        synchronized (entry.getValue()) {
+          for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+            if (tmpPartition.putVertex(vertex) != null) {
+              throw new IllegalStateException(
+                  "moveVerticesToWorker: Vertex " + vertex +
+                  " already exists!");
+            }
           }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("movePartitionsToWorker: Adding " +
+                entry.getValue().size() +
+                " vertices for partition id " + entry.getKey());
+          }
+          getPartitionMap().put(tmpPartition.getPartitionId(),
+              tmpPartition);
+          entry.getValue().clear();
         }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("moveVerticesToWorker: Adding " +
-              entry.getValue().size() +
-              " vertices for partition id " + entry.getKey());
-        }
-        getPartitionMap().put(tmpPartition.getPartitionId(),
-            tmpPartition);
-        entry.getValue().clear();
       }
       inPartitionVertexMap.clear();
     }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Wed May  9 18:47:32 2012
@@ -151,6 +151,11 @@ public class GiraphJob {
   /** Local ZooKeeper directory to use */
   public static final String ZOOKEEPER_DIR = "giraph.zkDir";
 
+  /** Use the RPC communication or netty communication */
+  public static final String USE_NETTY = "giraph.useNetty";
+  /** Default is to use RPC, not netty */
+  public static final boolean USE_NETTY_DEFAULT = false;
+
   /** Initial port to start using for the RPC communication */
   public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
   /** Default port to start using for the RPC communication */
@@ -182,18 +187,24 @@ public class GiraphJob {
   public static final String MAX_VERTICES_PER_PARTITION =
       "giraph.maxVerticesPerPartition";
   /** Default maximum number of vertices per partition before sending. */
-  public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 100000;
+  public static final int MAX_VERTICES_PER_PARTITION_DEFAULT = 10000;
 
   /** Maximum number of messages per peer before flush */
   public static final String MSG_SIZE = "giraph.msgSize";
   /** Default maximum number of messages per peer before flush */
-  public static final int MSG_SIZE_DEFAULT = 1000;
+  public static final int MSG_SIZE_DEFAULT = 2000;
+
+  /** Maximum number of mutations per partition before flush */
+  public static final String MAX_MUTATIONS_PER_REQUEST =
+      "giraph.maxMutationsPerRequest";
+  /** Default maximum number of mutations per partition before flush */
+  public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
 
   /** Maximum number of messages that can be bulk sent during a flush */
   public static final String MAX_MESSAGES_PER_FLUSH_PUT =
       "giraph.maxMessagesPerFlushPut";
   /** Default number of messages that can be bulk sent during a flush */
-  public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 5000;
+  public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
 
   /** Number of flush threads per peer */
   public static final String MSG_NUM_FLUSH_THREADS =
@@ -273,8 +284,8 @@ public class GiraphJob {
   public static final String CHECKPOINT_FREQUENCY =
       "giraph.checkpointFrequency";
 
-  /** Default checkpointing frequency of every 2 supersteps. */
-  public static final int CHECKPOINT_FREQUENCY_DEFAULT = 2;
+  /** Default checkpointing frequency of none. */
+  public static final int CHECKPOINT_FREQUENCY_DEFAULT = 0;
 
   /**
    * Delete checkpoints after a successful job run?

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Wed May  9 18:47:32 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.giraph.graph;
 
-import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -45,7 +45,7 @@ E extends Writable, M extends Writable> 
   /** Graph-wide BSP Mapper for this Vertex */
   private GraphMapper<I, V, E, M> graphMapper;
   /** Graph-wide worker communications */
-  private WorkerCommunications<I, V, E, M> workerCommunications;
+  private WorkerClientServer<I, V, E, M> workerCommunications;
 
   public long getSuperstep() {
     return superstep;
@@ -130,12 +130,12 @@ E extends Writable, M extends Writable> 
    * @return Returns this object.
    */
   public GraphState<I, V, E, M> setWorkerCommunications(
-      WorkerCommunications<I, V, E, M> workerCommunications) {
+      WorkerClientServer<I, V, E, M> workerCommunications) {
     this.workerCommunications = workerCommunications;
     return this;
   }
 
-  public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
+  public WorkerClientServer<I, V, E, M> getWorkerCommunications() {
     return workerCommunications;
   }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Wed May  9 18:47:32 2012
@@ -18,9 +18,14 @@
 
 package org.apache.giraph.graph;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.json.JSONException;
@@ -38,7 +43,8 @@ import org.json.JSONObject;
 @SuppressWarnings("rawtypes")
 public class VertexMutations<I extends WritableComparable,
     V extends Writable, E extends Writable,
-    M extends Writable> implements VertexChanges<I, V, E, M> {
+    M extends Writable> implements VertexChanges<I, V, E, M>,
+    Writable, Configurable {
   /** List of added vertices during the last superstep */
   private final List<BasicVertex<I, V, E, M>> addedVertexList =
       new ArrayList<BasicVertex<I, V, E, M>>();
@@ -48,12 +54,76 @@ public class VertexMutations<I extends W
   private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
   /** List of removed edges */
   private final List<I> removedEdgeList = new ArrayList<I>();
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Copy the vertex mutations.
+   *
+   * @return Copied vertex mutations
+   */
+  public VertexMutations<I, V, E, M> copy() {
+    VertexMutations<I, V, E, M> copied = new VertexMutations<I, V, E, M>();
+    copied.addedVertexList.addAll(this.addedVertexList);
+    copied.removedVertexCount = this.removedVertexCount;
+    copied.addedEdgeList.addAll(this.addedEdgeList);
+    copied.removedEdgeList.addAll(this.removedEdgeList);
+    copied.conf = this.conf;
+    return copied;
+  }
 
   @Override
   public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
     return addedVertexList;
   }
 
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    addedVertexList.clear();
+    addedEdgeList.clear();
+    removedEdgeList.clear();
+
+    int addedVertexListSize = input.readInt();
+    for (int i = 0; i < addedVertexListSize; ++i) {
+      BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+      vertex.readFields(input);
+      addedVertexList.add(vertex);
+    }
+    removedVertexCount = input.readInt();
+    int addedEdgeListSize = input.readInt();
+    for (int i = 0; i < addedEdgeListSize; ++i) {
+      I destVertex = BspUtils.createVertexIndex(conf);
+      destVertex.readFields(input);
+      E edgeValue = BspUtils.createEdgeValue(conf);
+      edgeValue.readFields(input);
+      addedEdgeList.add(new Edge<I, E>(destVertex, edgeValue));
+    }
+    int removedEdgeListSize = input.readInt();
+    for (int i = 0; i < removedEdgeListSize; ++i) {
+      I removedEdge = BspUtils.createVertexIndex(conf);
+      removedEdge.readFields(input);
+      removedEdgeList.add(removedEdge);
+    }
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(addedVertexList.size());
+    for (BasicVertex<I, V, E, M> vertex : addedVertexList) {
+      vertex.write(output);
+    }
+    output.writeInt(removedVertexCount);
+    output.writeInt(addedEdgeList.size());
+    for (Edge<I, E> edge : addedEdgeList) {
+      edge.getDestVertexId().write(output);
+      edge.getEdgeValue().write(output);
+    }
+    output.writeInt(removedEdgeList.size());
+    for (I removedEdge : removedEdgeList) {
+      removedEdge.write(output);
+    }
+  }
+
   /**
    * Add a vertex mutation
    *
@@ -103,6 +173,18 @@ public class VertexMutations<I extends W
     removedEdgeList.add(destinationVertexId);
   }
 
+  /**
+   * Add one vertex mutations to another
+   *
+   * @param vertexMutations Object to be added
+   */
+  public void addVertexMutations(VertexMutations<I, V, E, M> vertexMutations) {
+    addedVertexList.addAll(vertexMutations.getAddedVertexList());
+    removedVertexCount += vertexMutations.getRemovedVertexCount();
+    addedEdgeList.addAll(vertexMutations.getAddedEdgeList());
+    removedEdgeList.addAll(vertexMutations.getRemovedEdgeList());
+  }
+
   @Override
   public String toString() {
     JSONObject jsonObject = new JSONObject();
@@ -117,4 +199,14 @@ public class VertexMutations<I extends W
           e);
     }
   }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed May  9 18:47:32 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import org.apache.hadoop.io.Writable;
 
@@ -69,6 +70,10 @@ public class WorkerInfo implements Writa
     return hostnameId;
   }
 
+  public InetSocketAddress getHostnamePort() {
+    return new InetSocketAddress(hostname, port);
+  }
+
   public int getPort() {
     return port;
   }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestAutoCheckpoint.java Wed May  9 18:47:32 2012
@@ -33,57 +33,60 @@ import org.junit.Test;
  * Unit test for automated checkpoint restarting
  */
 public class TestAutoCheckpoint extends BspCase {
-    /** Where the checkpoints will be stored and restarted */
-    private final String HDFS_CHECKPOINT_DIR =
-        "/tmp/testBspCheckpoints";
+  /** Where the checkpoints will be stored and restarted */
+  private final String HDFS_CHECKPOINT_DIR =
+      "/tmp/testBspCheckpoints";
 
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public TestAutoCheckpoint(String testName) {
-        super(testName);
-    }
+  /**
+   * Create the test case
+   *
+   * @param testName name of the test case
+   */
+  public TestAutoCheckpoint(String testName) {
+    super(testName);
+  }
 
-    
-    public TestAutoCheckpoint() {
-        super(TestAutoCheckpoint.class.getName());
-    }
 
-    /**
-     * Run a job that requires checkpointing and will have a worker crash
-     * and still recover from a previous checkpoint.
-     *
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSingleFault()
-            throws IOException, InterruptedException, ClassNotFoundException {
-        if (getJobTracker() == null) {
-            System.out.println(
-                "testSingleFault: Ignore this test in local mode.");
-            return;
-        }
-        GiraphJob job = new GiraphJob(getCallingMethodName());
-        setupConfiguration(job);
-        job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT,
-                                          true);
-        job.getConfiguration().setInt("mapred.map.max.attempts", 4);
-        job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000);
-        job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
-                                   HDFS_CHECKPOINT_DIR);
-        job.getConfiguration().setBoolean(
-            GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
-        job.setVertexClass(SimpleCheckpointVertex.class);
-        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-        job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
-        job.setWorkerContextClass(
-            SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
-        Path outputPath = new Path("/tmp/" + getCallingMethodName());
-        removeAndSetOutput(job, outputPath);
-        assertTrue(job.run(true));
+  public TestAutoCheckpoint() {
+    super(TestAutoCheckpoint.class.getName());
+  }
+
+  /**
+   * Run a job that requires checkpointing and will have a worker crash
+   * and still recover from a previous checkpoint.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testSingleFault()
+    throws IOException, InterruptedException, ClassNotFoundException {
+    if (getJobTracker() == null) {
+      System.out.println(
+          "testSingleFault: Ignore this test in local mode.");
+      return;
     }
+    GiraphJob job = new GiraphJob(getCallingMethodName());
+    setupConfiguration(job);
+    job.getConfiguration().setBoolean(SimpleCheckpointVertex.ENABLE_FAULT,
+        true);
+    job.getConfiguration().setInt("mapred.map.max.attempts", 4);
+    // Trigger failure faster
+    job.getConfiguration().setInt("mapred.task.timeout", 30000);
+    job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 5000);
+    job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
+    job.getConfiguration().set(GiraphJob.CHECKPOINT_DIRECTORY,
+        HDFS_CHECKPOINT_DIR);
+    job.getConfiguration().setBoolean(
+        GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.setVertexClass(SimpleCheckpointVertex.class);
+    job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+    job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+    job.setWorkerContextClass(
+        SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+    Path outputPath = new Path("/tmp/" + getCallingMethodName());
+    removeAndSetOutput(job, outputPath);
+    assertTrue(job.run(true));
+  }
 }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java Wed May  9 18:47:32 2012
@@ -44,7 +44,7 @@ public class TestJsonBase64Format extend
   public TestJsonBase64Format(String testName) {
     super(testName);
   }
-  
+
   public TestJsonBase64Format() {
     super(TestJsonBase64Format.class.getName());
   }

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java Wed May  9 18:47:32 2012
@@ -45,7 +45,7 @@ public class TestManualCheckpoint extend
   public TestManualCheckpoint(String testName) {
     super(testName);
   }
-  
+
   public TestManualCheckpoint() {
     super(TestManualCheckpoint.class.getName());
   }
@@ -65,6 +65,7 @@ public class TestManualCheckpoint extend
         HDFS_CHECKPOINT_DIR);
     job.getConfiguration().setBoolean(
         GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
+    job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 2);
     job.setVertexClass(SimpleCheckpointVertex.class);
     job.setWorkerContextClass(
         SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed May  9 18:47:32 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.mockito.Mockito.mock;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Test;
+
+/**
+ * Test the netty connections
+ */
+public class ConnectionTest {
+  /**
+   * Test connecting a single client to a single server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectSingleClientServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+
+    Configuration conf = new Configuration();
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+    client.stop();
+    server.stop();
+  }
+
+  /**
+   * Test connecting one client to three servers.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectOneClientToThreeServers() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+
+    Configuration conf = new Configuration();
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server1.start();
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server2 =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server2.start();
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server3 =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server3.start();
+
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    List<InetSocketAddress> serverAddresses =
+        new ArrayList<InetSocketAddress>();
+    client.connectAllAdddresses(serverAddresses);
+
+    client.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  /**
+   * Test connecting three clients to one server.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void connectThreeClientsToOneServer() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+
+    Configuration conf = new Configuration();
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client1 =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    client1.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client2 =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    client2.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client3 =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    client3.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+    client1.stop();
+    client2.stop();
+    client3.stop();
+    server.stop();
+  }
+}

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1336344&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Wed May  9 18:47:32 2012
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.VertexMutations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Test all the different netty requests.
+ */
+public class RequestTest {
+  /** Configuration */
+  private Configuration conf;
+  /** Server data */
+  private ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  serverData;
+  /** Server */
+  private NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>
+  server;
+  /** Client */
+  private NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+  client;
+
+  /**
+   * Only for testing.
+   */
+  public static class TestVertex extends EdgeListVertex<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Iterator<IntWritable> msgIterator) throws IOException {
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+
+    // Setup the conf
+    conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_CLASS, TestVertex.class, BasicVertex.class);
+    conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
+        IntWritable.class, WritableComparable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
+        IntWritable.class, Writable.class);
+    conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
+        IntWritable.class, Writable.class);
+    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+        IntWritable.class, Writable.class);
+
+    // Start the service
+    serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>();
+    server =
+        new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
+            conf, serverData);
+    server.start();
+    client =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+        IntWritable>(context);
+    client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+  }
+
+  @Test
+  public void sendVertexPartition() throws IOException {
+    // Data to send
+    int partitionId = 13;
+    Collection<BasicVertex<IntWritable, IntWritable, IntWritable,
+    IntWritable>> vertices =
+        new ArrayList<BasicVertex<IntWritable, IntWritable,
+        IntWritable, IntWritable>>();
+    for (int i = 0; i < 10; ++i) {
+      TestVertex vertex = new TestVertex();
+      vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
+      vertices.add(vertex);
+    }
+
+    // Send the request
+    SendVertexRequest<IntWritable, IntWritable, IntWritable,
+    IntWritable> request =
+      new SendVertexRequest<IntWritable, IntWritable,
+      IntWritable, IntWritable>(partitionId, vertices);
+    client.sendWritableRequest(server.getMyAddress(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    Map<Integer, Collection<BasicVertex<IntWritable, IntWritable,
+    IntWritable, IntWritable>>> partitionVertexMap =
+        serverData.getPartitionVertexMap();
+    synchronized (partitionVertexMap) {
+      assertTrue(partitionVertexMap.containsKey(partitionId));
+      int total = 0;
+      for (BasicVertex<IntWritable, IntWritable,
+          IntWritable, IntWritable> vertex :
+            (partitionVertexMap.get(partitionId))) {
+        total += vertex.getVertexId().get();
+      }
+      assertEquals(total, 45);
+    }
+  }
+
+  @Test
+  public void sendPartitionMessagesRequest() throws IOException {
+    // Data to send
+    int partitionId = 17;
+    Map<IntWritable, Collection<IntWritable>> vertexIdMessages =
+        Maps.newHashMap();
+    for (int i = 1; i < 7; ++i) {
+      IntWritable vertexId = new IntWritable(i);
+      Collection<IntWritable> messages = Lists.newArrayList();
+      for (int j = 0; j < i; ++j) {
+        messages.add(new IntWritable(j));
+      }
+      vertexIdMessages.put(vertexId, messages);
+    }
+
+    // Send the request
+    SendPartitionMessagesRequest<IntWritable, IntWritable, IntWritable,
+    IntWritable> request =
+      new SendPartitionMessagesRequest<IntWritable, IntWritable,
+      IntWritable, IntWritable>(partitionId, vertexIdMessages);
+    client.sendWritableRequest(server.getMyAddress(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    ConcurrentHashMap<IntWritable, Collection<IntWritable>> inVertexIdMessages =
+        serverData.getTransientMessages();
+    int keySum = 0;
+    int messageSum = 0;
+    for (Entry<IntWritable, Collection<IntWritable>> entry :
+        inVertexIdMessages.entrySet()) {
+      keySum += entry.getKey().get();
+      synchronized (entry.getValue()) {
+        for (IntWritable message : entry.getValue()) {
+          messageSum += message.get();
+        }
+      }
+    }
+    assertEquals(21, keySum);
+    assertEquals(35, messageSum);
+  }
+
+  @Test
+  public void sendPartitionMutationsRequest() throws IOException {
+    // Data to send
+    int partitionId = 19;
+    Map<IntWritable, VertexMutations<IntWritable, IntWritable,
+    IntWritable, IntWritable>> vertexIdMutations =
+        Maps.newHashMap();
+    for (int i = 0; i < 11; ++i) {
+      VertexMutations<IntWritable, IntWritable, IntWritable, IntWritable>
+      mutations =
+      new VertexMutations<IntWritable, IntWritable,
+      IntWritable, IntWritable>();
+      for (int j = 0; j < 3; ++j) {
+        TestVertex vertex = new TestVertex();
+        vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
+        mutations.addVertex(vertex);
+      }
+      for (int j = 0; j < 2; ++j) {
+        mutations.removeVertex();
+      }
+      for (int j = 0; j < 5; ++j) {
+        Edge<IntWritable, IntWritable> edge =
+            new Edge<IntWritable, IntWritable>(
+                new IntWritable(i), new IntWritable(2*j));
+        mutations.addEdge(edge);
+      }
+      for (int j = 0; j < 7; ++j) {
+        mutations.removeEdge(new IntWritable(j));
+      }
+      vertexIdMutations.put(new IntWritable(i), mutations);
+    }
+
+    // Send the request
+    SendPartitionMutationsRequest<IntWritable, IntWritable, IntWritable,
+    IntWritable> request =
+      new SendPartitionMutationsRequest<IntWritable, IntWritable,
+      IntWritable, IntWritable>(partitionId, vertexIdMutations);
+    client.sendWritableRequest(server.getMyAddress(), request);
+    client.waitAllRequests();
+
+    // Stop the service
+    client.stop();
+    server.stop();
+
+    // Check the output
+    ConcurrentHashMap<IntWritable, VertexMutations<IntWritable, IntWritable,
+    IntWritable, IntWritable>> inVertexIdMutations =
+        serverData.getVertexMutations();
+    int keySum = 0;
+    for (Entry<IntWritable, VertexMutations<IntWritable, IntWritable,
+        IntWritable, IntWritable>> entry :
+          inVertexIdMutations.entrySet()) {
+      synchronized (entry.getValue()) {
+        keySum += entry.getKey().get();
+        int vertexValueSum = 0;
+        for (BasicVertex<IntWritable, IntWritable, IntWritable, IntWritable>
+        vertex : entry.getValue().getAddedVertexList()) {
+          vertexValueSum += vertex.getVertexValue().get();
+        }
+        assertEquals(3, vertexValueSum);
+        assertEquals(2, entry.getValue().getRemovedVertexCount());
+        int removeEdgeValueSum = 0;
+        for (Edge<IntWritable, IntWritable> edge :
+          entry.getValue().getAddedEdgeList()) {
+          removeEdgeValueSum += edge.getEdgeValue().get();
+        }
+        assertEquals(20, removeEdgeValueSum);
+      }
+    }
+    assertEquals(55, keySum);
+  }
+}

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java Wed May  9 18:47:32 2012
@@ -113,7 +113,8 @@ public class SimpleShortestPathVertexTes
         "[1,0,[[2,1],[3,3]]]",
         "[2,0,[[3,1],[4,10]]]",
         "[3,0,[[4,2]]]",
-    "[4,0,[]]" };
+        "[4,0,[]]"
+    };
 
     // start from vertex 1
     Map<String, String> params = Maps.newHashMap();

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1336344&r1=1336343&r2=1336344&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java Wed May  9 18:47:32 2012
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.comm.WorkerCommunications;
+import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.conf.Configuration;
@@ -47,13 +47,13 @@ public class MockUtils {
         private final GraphState<I, V, E, M> graphState;
         private final Mapper.Context context;
         private final Configuration conf;
-        private final WorkerCommunications communications;
+        private final WorkerClientServer communications;
 
         public MockedEnvironment() {
             graphState = Mockito.mock(GraphState.class);
             context = Mockito.mock(Mapper.Context.class);
             conf = Mockito.mock(Configuration.class);
-            communications = Mockito.mock(WorkerCommunications.class);
+            communications = Mockito.mock(WorkerClientServer.class);
         }
 
         /** the injected graph state */
@@ -72,7 +72,7 @@ public class MockUtils {
         }
 
         /** the injected worker communications */
-        public WorkerCommunications getCommunications() {
+        public WorkerClientServer getCommunications() {
             return communications;
         }