You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/17 06:33:18 UTC

svn commit: r1399090 [3/3] - in /giraph/trunk: ./ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/s...

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Wed Oct 17 04:33:16 2012
@@ -34,6 +34,7 @@ import java.util.Set;
 
 /**
  * Mutable vertex with no edge values.
+ *
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <M> Message data
@@ -96,7 +97,7 @@ public abstract class SimpleMutableVerte
    * @param sourceVertexId Source vertex id of edge
    */
   public void addEdgeRequest(I sourceVertexId) throws IOException {
-    getGraphState().getWorkerCommunications().
+    getGraphState().getWorkerClientRequestProcessor().
         addEdgeRequest(sourceVertexId, new Edge<I,
             NullWritable>(sourceVertexId, NullWritable.get()));
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Wed Oct 17 04:33:16 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -221,11 +220,13 @@ public abstract class Vertex<I extends W
       throw new IllegalArgumentException(
           "sendMessage: Cannot send null message to " + id);
     }
-    getWorkerCommunications().sendMessageRequest(id, message);
+    getGraphState().getWorkerClientRequestProcessor().
+        sendMessageRequest(id, message);
   }
 
   /**
    * Lookup WorkerInfo for myself.
+   *
    * @return WorkerInfo about worker holding this Vertex.
    */
   public WorkerInfo getMyWorkerInfo() {
@@ -234,6 +235,7 @@ public abstract class Vertex<I extends W
 
   /**
    * Lookup WorkerInfo for a Vertex.
+   *
    * @param vertexId VertexId to lookup
    * @return WorkerInfo about worker holding this Vertex.
    */
@@ -243,19 +245,13 @@ public abstract class Vertex<I extends W
 
   /**
    * Lookup PartitionOwner for a Vertex
+   *
    * @param vertexId id of Vertex to look up.
    * @return PartitionOwner holding Vertex
    */
   private PartitionOwner getVertexPartitionOwner(I vertexId) {
-    return getWorkerCommunications().getVertexPartitionOwner(vertexId);
-  }
-
-  /**
-   * Get WorkerClientServer used to communicate with other servers.
-   * @return WorkerClientServer used.
-   */
-  private WorkerClientServer<I, V, E, M> getWorkerCommunications() {
-    return getGraphState().getWorkerCommunications();
+    return getGraphState().getWorkerClientRequestProcessor().
+        getVertexPartitionOwner(vertexId);
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Wed Oct 17 04:33:16 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph.partition;
 
+import com.google.common.collect.Lists;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -42,9 +43,12 @@ import java.util.Set;
 public class HashWorkerPartitioner<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements WorkerGraphPartitioner<I, V, E, M> {
-  /** Mapping of the vertex ids to {@link PartitionOwner} */
+  /**
+   * Mapping of the vertex ids to {@link PartitionOwner}.  Needs to be
+   * thread-safe (hence CopyOnWriteArrayList).
+   */
   protected List<PartitionOwner> partitionOwnerList =
-      new ArrayList<PartitionOwner>();
+      Lists.newCopyOnWriteArrayList();
 
   @Override
   public PartitionOwner createPartitionOwner() {
@@ -53,10 +57,8 @@ public class HashWorkerPartitioner<I ext
 
   @Override
   public PartitionOwner getPartitionOwner(I vertexId) {
-    synchronized (partitionOwnerList) {
-      return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
-          partitionOwnerList.size());
-    }
+    return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+        partitionOwnerList.size());
   }
 
   @Override
@@ -72,10 +74,8 @@ public class HashWorkerPartitioner<I ext
       WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
       PartitionStore<I, V, E, M> partitionStore) {
-    synchronized (partitionOwnerList) {
-      partitionOwnerList.clear();
-      partitionOwnerList.addAll(masterSetPartitionOwners);
-    }
+    partitionOwnerList.clear();
+    partitionOwnerList.addAll(masterSetPartitionOwners);
 
     Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
     Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Wed Oct 17 04:33:16 2012
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.giraph.graph.partition;
 
 import java.io.DataInput;
@@ -37,6 +36,8 @@ public class PartitionStats implements W
   private long finishedVertexCount = 0;
   /** Edges in this partition */
   private long edgeCount = 0;
+  /** Messages sent from this partition */
+  private long messagesSentCount = 0;
 
   /**
    * Default constructor for reflection.
@@ -50,15 +51,18 @@ public class PartitionStats implements W
    * @param vertexCount Vertex count.
    * @param finishedVertexCount Finished vertex count.
    * @param edgeCount Edge count.
+   * @param messagesSentCount Number of messages sent
    */
   public PartitionStats(int partitionId,
       long vertexCount,
       long finishedVertexCount,
-      long edgeCount) {
+      long edgeCount,
+      long messagesSentCount) {
     this.partitionId = partitionId;
     this.vertexCount = vertexCount;
     this.finishedVertexCount = finishedVertexCount;
     this.edgeCount = edgeCount;
+    this.messagesSentCount = messagesSentCount;
   }
 
   /**
@@ -129,12 +133,31 @@ public class PartitionStats implements W
     return edgeCount;
   }
 
+  /**
+   * Add messages to the messages sent count.
+   *
+   * @param messagesSentCount Number of messages to add.
+   */
+  public void addMessagesSentCount(long messagesSentCount) {
+    this.messagesSentCount += messagesSentCount;
+  }
+
+  /**
+   * Get the messages sent count.
+   *
+   * @return Messages sent count.
+   */
+  public long getMessagesSentCount() {
+    return messagesSentCount;
+  }
+
   @Override
   public void readFields(DataInput input) throws IOException {
     partitionId = input.readInt();
     vertexCount = input.readLong();
     finishedVertexCount = input.readLong();
     edgeCount = input.readLong();
+    messagesSentCount = input.readLong();
   }
 
   @Override
@@ -143,11 +166,13 @@ public class PartitionStats implements W
     output.writeLong(vertexCount);
     output.writeLong(finishedVertexCount);
     output.writeLong(edgeCount);
+    output.writeLong(messagesSentCount);
   }
 
   @Override
   public String toString() {
     return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
-        finishedVertexCount + ",edges=" + edgeCount + ")";
+        finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
+        messagesSentCount + ")";
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Wed Oct 17 04:33:16 2012
@@ -111,7 +111,8 @@ public abstract class PartitionStore<I e
   }
 
   /**
-   * Return all the stored partitions as an Iterable.
+   * Return all the stored partitions as an Iterable.  Note that this may force
+   * out-of-core partitions to be loaded into memory if using out-of-core.
    *
    * @return The partition objects
    */

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Logger utils for log4j
+ */
+public class LoggerUtils {
+  /**
+   * Don't construct this.
+   */
+  private LoggerUtils() { }
+
+  /**
+   * Helper method to set the status and log message together.
+   *
+   * @param context Context to set the status with
+   * @param logger Logger to write to
+   * @param level Level of logging
+   * @param message Message to
+   */
+  public static void setStatusAndLog(
+      TaskAttemptContext context, Logger logger, Level level,
+      String message) {
+    try {
+      context.setStatus(message);
+    } catch (IOException e) {
+      throw new IllegalStateException("setStatusAndLog: Got IOException", e);
+    }
+    if (logger.isEnabledFor(level)) {
+      logger.log(level, message);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Wed Oct 17 04:33:16 2012
@@ -21,16 +21,19 @@ package org.apache.giraph.utils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Functions for waiting on some events to happen while reporting progress */
 public class ProgressableUtils {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ProgressableUtils.class);
-  /** Msecs to refresh the progress meter */
-  private static final int MSEC_PERIOD = 10000;
+  /** Msecs to refresh the progress meter (one minute) */
+  private static final int MSEC_PERIOD = 60 * 1000;
 
   /** Do not instantiate. */
   private ProgressableUtils() { }
@@ -39,7 +42,7 @@ public class ProgressableUtils {
    * Wait for executor tasks to terminate, while periodically reporting
    * progress.
    *
-   * @param executor     Executor which we are waiting for
+   * @param executor Executor which we are waiting for
    * @param progressable Progressable for reporting progress (Job context)
    */
   public static void awaitExecutorTermination(ExecutorService executor,
@@ -85,4 +88,73 @@ public class ProgressableUtils {
       remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
     }
   }
+
+
+  /**
+   * Wait for the result of the future to be ready, while periodically
+   * reporting progress.
+   *
+   * @param <T> Type of the return value of the future
+   * @param future Future
+   * @param progressable Progressable for reporting progress (Job context)
+   * @return Computed result of the future.
+   */
+  public static <T> T getFutureResult(Future<T> future,
+                                      Progressable progressable) {
+    while (!future.isDone()) {
+      tryGetFutureResult(future, progressable, MSEC_PERIOD);
+    }
+
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("get: " +
+          "InterruptedException occurred while waiting for future result", e);
+    } catch (ExecutionException e) {
+      throw new IllegalStateException("get: " +
+          "ExecutionException occurred while waiting for future result", e);
+    }
+  }
+
+  /**
+   * Wait maximum given number of milliseconds for result to become available,
+   * while periodically reporting progress.
+   *
+   * @param <T> Type of the return value of the future
+   * @param future Future
+   * @param progressable Progressable for reporting progress (Job context)
+   * @param msecs Number of milliseconds to wait
+   * @return Future result
+   */
+  public static <T> T tryGetFutureResult(
+      Future<T> future, Progressable progressable, int msecs) {
+    long maxMsecs = System.currentTimeMillis() + msecs;
+    int curMsecTimeout;
+    while (true) {
+      curMsecTimeout = Math.min(msecs, MSEC_PERIOD);
+      try {
+        future.get(curMsecTimeout, TimeUnit.MILLISECONDS);
+        if (future.isDone()) {
+          return future.get();
+        }
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("tryGet: " +
+            "InterruptedException occurred while waiting for future result",
+            e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("tryGet: " +
+            "ExecutionException occurred while waiting for future result", e);
+      } catch (TimeoutException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("tryGetFutureResult: Timeout occurred");
+        }
+      }
+
+      progressable.progress();
+      if (System.currentTimeMillis() >= maxMsecs) {
+        return null;
+      }
+      msecs = Math.max(0, msecs - curMsecTimeout);
+    }
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Wed Oct 17 04:33:16 2012
@@ -33,10 +33,16 @@ public interface Time {
   long NS_PER_MS = US_PER_MS * NS_PER_US;
   /** Milliseconds per second */
   long MS_PER_SECOND = 1000;
+  /** Milliseconds per second (as float) */
+  float MS_PER_SECOND_AS_FLOAT = MS_PER_SECOND * 1f;
   /** Microseconds per second */
   long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
+  /** Microseconds per second (as float) */
+  float US_PER_SECOND_AS_FLOAT = US_PER_SECOND * 1f;
   /** Nanoseconds per second */
   long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
+  /** Nanoseconds per second (as float) */
+  float NS_PER_SECOND_AS_FLOAT = NS_PER_SECOND * 1f;
   /** Seconds per hour */
   long SECONDS_PER_HOUR = 60 * 60;
   /** Seconds per day */

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Wed Oct 17 04:33:16 2012
@@ -37,7 +37,8 @@ import org.apache.zookeeper.ZooKeeper;
 
 /**
  * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
- * non-atomic operations that are useful.
+ * non-atomic operations that are useful.  All methods of this class should
+ * be thread-safe.
  */
 public class ZooKeeperExt extends ZooKeeper {
   /** Internal logger */

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Wed Oct 17 04:33:16 2012
@@ -81,14 +81,14 @@ public class BspCase implements Watcher 
 
     // Allow this test to be run on a real Hadoop setup
     if (runningInDistributedMode()) {
-      System.out.println("setup: Sending job to job tracker " +
+      System.out.println("setupConfiguration: Sending job to job tracker " +
           jobTracker + " with jar path " + getJarLocation()
           + " for " + getName());
       conf.set("mapred.job.tracker", jobTracker);
       conf.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f);
     }
     else {
-      System.out.println("setup: Using local job runner with " +
+      System.out.println("setupConfiguration: Using local job runner with " +
           "location " + getJarLocation() + " for " + getName());
       conf.setWorkerConfiguration(1, 1, 100.0f);
       // Single node testing
@@ -341,7 +341,7 @@ public class BspCase implements Watcher 
   @Before
   public void setUp() {
     if (runningInDistributedMode()) {
-      System.out.println("Setting tasks to 3 for " + getName() +
+      System.out.println("setUp: Setting tasks to 3 for " + getName() +
           " since JobTracker exists...");
       numWorkers = 3;
     }

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java Wed Oct 17 04:33:16 2012
@@ -117,15 +117,11 @@ public class TestBspBasic extends BspCas
     GiraphJob job = prepareJob(getCallingMethodName(),
         SimpleSuperstepVertex.class,
         SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
-    GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
-        new GraphState<LongWritable, IntWritable,
-        FloatWritable, IntWritable>();
     ImmutableClassesGiraphConfiguration configuration =
         new ImmutableClassesGiraphConfiguration(job.getConfiguration());
     Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
         configuration.createVertex();
-    System.out.println("testInstantiateVertex: Got vertex " + vertex +
-        ", graphState" + gs);
+    System.out.println("testInstantiateVertex: Got vertex " + vertex);
     VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
     inputFormat = configuration.createVertexInputFormat();
 /*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
@@ -358,38 +354,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
   }
 
   /**
-   * Run a sample BSP job locally and test PageRank.
-   *
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testBspPageRank()
-      throws IOException, InterruptedException, ClassNotFoundException {
-    GiraphJob job = prepareJob(getCallingMethodName(),
-        SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class);
-    job.getConfiguration().setWorkerContextClass(
-        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
-    job.getConfiguration().setMasterComputeClass(
-        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
-    assertTrue(job.run(true));
-    if (!runningInDistributedMode()) {
-      double maxPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
-      double minPageRank =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
-      long numVertices =
-          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
-      System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
-          " minPageRank=" + minPageRank + " numVertices=" + numVertices);
-      assertEquals(34.03, maxPageRank, 0.001);
-      assertEquals(0.03, minPageRank, 0.00001);
-      assertEquals(5l, numVertices);
-    }
-  }
-
-  /**
    * Run a sample BSP job locally and test shortest paths.
    *
    * @throws IOException

Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java (added)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph;
+
+import java.io.IOException;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.partition.HashMasterPartitioner;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank (with and without multithreading)
+ */
+public class TestPageRank extends BspCase {
+
+  /**
+   * Constructor
+   */
+  public TestPageRank() {
+    super(TestPageRank.class.getName());
+  }
+
+  @Test
+  public void testBspPageRankSingleCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(1);
+  }
+
+
+  @Test
+  public void testPageRankTenThreadsCompute()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    testPageRank(10);
+  }
+
+  /**
+   * Generic page rank test
+   *
+   * @param numComputeThreads Number of compute threads to use
+   * @throws java.io.IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  private void testPageRank(int numComputeThreads)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimplePageRankVertex.class, SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+    job.getConfiguration().setWorkerContextClass(
+        SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+    job.getConfiguration().setMasterComputeClass(
+        SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+    job.getConfiguration().setNumComputeThreads(numComputeThreads);
+    // Set enough partitions to generate randomness on the compute side
+    if (numComputeThreads != 1) {
+      job.getConfiguration().setInt(
+          HashMasterPartitioner.USER_PARTITION_COUNT,
+          numComputeThreads * 5);
+    }
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double maxPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+      double minPageRank =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+      long numVertices =
+          SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+      System.out.println(getCallingMethodName() + ": maxPageRank=" +
+          maxPageRank + " minPageRank=" +
+          minPageRank + " numVertices=" + numVertices + ", " +
+          " numComputeThreads=" + numComputeThreads);
+      assertEquals(34.03, maxPageRank, 0.001);
+      assertEquals(0.03, minPageRank, 0.00001);
+      assertEquals(5l, numVertices);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java Wed Oct 17 04:33:16 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.utils;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.GraphState;
@@ -53,13 +54,14 @@ public class MockUtils {
         private final GraphState<I, V, E, M> graphState;
         private final Mapper.Context context;
         private final Configuration conf;
-        private final WorkerClientServer communications;
+        private final WorkerClientRequestProcessor workerClientRequestProcessor;
 
         public MockedEnvironment() {
             graphState = Mockito.mock(GraphState.class);
             context = Mockito.mock(Mapper.Context.class);
             conf = Mockito.mock(Configuration.class);
-            communications = Mockito.mock(WorkerClientServer.class);
+            workerClientRequestProcessor =
+                Mockito.mock(WorkerClientRequestProcessor.class);
         }
 
         /** the injected graph state */
@@ -78,19 +80,19 @@ public class MockUtils {
         }
 
         /** the injected worker communications */
-        public WorkerClientServer getCommunications() {
-            return communications;
+        public WorkerClientRequestProcessor getWorkerClientRequestProcessor() {
+            return workerClientRequestProcessor;
         }
 
         /** assert that the test vertex message has been sent to a particular vertex */
         public void verifyMessageSent(I targetVertexId, M message) {
-            Mockito.verify(communications).sendMessageRequest(targetVertexId,
-                message);
+            Mockito.verify(workerClientRequestProcessor).sendMessageRequest
+                (targetVertexId, message);
         }
 
         /** assert that the test vertex has sent no message to a particular vertex */
         public void verifyNoMessageSent() {
-            Mockito.verifyZeroInteractions(communications);
+            Mockito.verifyZeroInteractions(workerClientRequestProcessor);
         }
     }
 
@@ -124,8 +126,8 @@ public class MockUtils {
                 .thenReturn(env.getContext());
         Mockito.when(env.getContext().getConfiguration())
                 .thenReturn(env.getConfiguration());
-        Mockito.when(env.getGraphState().getWorkerCommunications())
-                .thenReturn(env.getCommunications());
+        Mockito.when(env.getGraphState().getWorkerClientRequestProcessor())
+                .thenReturn(env.getWorkerClientRequestProcessor());
 
         ReflectionUtils.setField(vertex, "id", vertexId);
         ReflectionUtils.setField(vertex, "value", vertexValue);