You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/17 06:33:18 UTC
svn commit: r1399090 [3/3] - in /giraph/trunk: ./
giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/
giraph/src/main/java/org/apache/giraph/
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/ giraph/s...
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Wed Oct 17 04:33:16 2012
@@ -34,6 +34,7 @@ import java.util.Set;
/**
* Mutable vertex with no edge values.
+ *
* @param <I> Vertex id
* @param <V> Vertex data
* @param <M> Message data
@@ -96,7 +97,7 @@ public abstract class SimpleMutableVerte
* @param sourceVertexId Source vertex id of edge
*/
public void addEdgeRequest(I sourceVertexId) throws IOException {
- getGraphState().getWorkerCommunications().
+ getGraphState().getWorkerClientRequestProcessor().
addEdgeRequest(sourceVertexId, new Edge<I,
NullWritable>(sourceVertexId, NullWritable.get()));
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Wed Oct 17 04:33:16 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.graph;
import org.apache.giraph.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -221,11 +220,13 @@ public abstract class Vertex<I extends W
throw new IllegalArgumentException(
"sendMessage: Cannot send null message to " + id);
}
- getWorkerCommunications().sendMessageRequest(id, message);
+ getGraphState().getWorkerClientRequestProcessor().
+ sendMessageRequest(id, message);
}
/**
* Lookup WorkerInfo for myself.
+ *
* @return WorkerInfo about worker holding this Vertex.
*/
public WorkerInfo getMyWorkerInfo() {
@@ -234,6 +235,7 @@ public abstract class Vertex<I extends W
/**
* Lookup WorkerInfo for a Vertex.
+ *
* @param vertexId VertexId to lookup
* @return WorkerInfo about worker holding this Vertex.
*/
@@ -243,19 +245,13 @@ public abstract class Vertex<I extends W
/**
* Lookup PartitionOwner for a Vertex
+ *
* @param vertexId id of Vertex to look up.
* @return PartitionOwner holding Vertex
*/
private PartitionOwner getVertexPartitionOwner(I vertexId) {
- return getWorkerCommunications().getVertexPartitionOwner(vertexId);
- }
-
- /**
- * Get WorkerClientServer used to communicate with other servers.
- * @return WorkerClientServer used.
- */
- private WorkerClientServer<I, V, E, M> getWorkerCommunications() {
- return getGraphState().getWorkerCommunications();
+ return getGraphState().getWorkerClientRequestProcessor().
+ getVertexPartitionOwner(vertexId);
}
/**
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java Wed Oct 17 04:33:16 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph.partition;
+import com.google.common.collect.Lists;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -42,9 +43,12 @@ import java.util.Set;
public class HashWorkerPartitioner<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
implements WorkerGraphPartitioner<I, V, E, M> {
- /** Mapping of the vertex ids to {@link PartitionOwner} */
+ /**
+ * Mapping of the vertex ids to {@link PartitionOwner}. Needs to be
+ * thread-safe (hence CopyOnWriteArrayList).
+ */
protected List<PartitionOwner> partitionOwnerList =
- new ArrayList<PartitionOwner>();
+ Lists.newCopyOnWriteArrayList();
@Override
public PartitionOwner createPartitionOwner() {
@@ -53,10 +57,8 @@ public class HashWorkerPartitioner<I ext
@Override
public PartitionOwner getPartitionOwner(I vertexId) {
- synchronized (partitionOwnerList) {
- return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
- partitionOwnerList.size());
- }
+ return partitionOwnerList.get(Math.abs(vertexId.hashCode()) %
+ partitionOwnerList.size());
}
@Override
@@ -72,10 +74,8 @@ public class HashWorkerPartitioner<I ext
WorkerInfo myWorkerInfo,
Collection<? extends PartitionOwner> masterSetPartitionOwners,
PartitionStore<I, V, E, M> partitionStore) {
- synchronized (partitionOwnerList) {
- partitionOwnerList.clear();
- partitionOwnerList.addAll(masterSetPartitionOwners);
- }
+ partitionOwnerList.clear();
+ partitionOwnerList.addAll(masterSetPartitionOwners);
Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java Wed Oct 17 04:33:16 2012
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.graph.partition;
import java.io.DataInput;
@@ -37,6 +36,8 @@ public class PartitionStats implements W
private long finishedVertexCount = 0;
/** Edges in this partition */
private long edgeCount = 0;
+ /** Messages sent from this partition */
+ private long messagesSentCount = 0;
/**
* Default constructor for reflection.
@@ -50,15 +51,18 @@ public class PartitionStats implements W
* @param vertexCount Vertex count.
* @param finishedVertexCount Finished vertex count.
* @param edgeCount Edge count.
+ * @param messagesSentCount Number of messages sent
*/
public PartitionStats(int partitionId,
long vertexCount,
long finishedVertexCount,
- long edgeCount) {
+ long edgeCount,
+ long messagesSentCount) {
this.partitionId = partitionId;
this.vertexCount = vertexCount;
this.finishedVertexCount = finishedVertexCount;
this.edgeCount = edgeCount;
+ this.messagesSentCount = messagesSentCount;
}
/**
@@ -129,12 +133,31 @@ public class PartitionStats implements W
return edgeCount;
}
+ /**
+ * Add messages to the messages sent count.
+ *
+ * @param messagesSentCount Number of messages to add.
+ */
+ public void addMessagesSentCount(long messagesSentCount) {
+ this.messagesSentCount += messagesSentCount;
+ }
+
+ /**
+ * Get the messages sent count.
+ *
+ * @return Messages sent count.
+ */
+ public long getMessagesSentCount() {
+ return messagesSentCount;
+ }
+
@Override
public void readFields(DataInput input) throws IOException {
partitionId = input.readInt();
vertexCount = input.readLong();
finishedVertexCount = input.readLong();
edgeCount = input.readLong();
+ messagesSentCount = input.readLong();
}
@Override
@@ -143,11 +166,13 @@ public class PartitionStats implements W
output.writeLong(vertexCount);
output.writeLong(finishedVertexCount);
output.writeLong(edgeCount);
+ output.writeLong(messagesSentCount);
}
@Override
public String toString() {
return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
- finishedVertexCount + ",edges=" + edgeCount + ")";
+ finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
+ messagesSentCount + ")";
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java Wed Oct 17 04:33:16 2012
@@ -111,7 +111,8 @@ public abstract class PartitionStore<I e
}
/**
- * Return all the stored partitions as an Iterable.
+ * Return all the stored partitions as an Iterable. Note that this may force
+ * out-of-core partitions to be loaded into memory if using out-of-core.
*
* @return The partition objects
*/
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/LoggerUtils.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Logger utils for log4j
+ */
+public class LoggerUtils {
+ /**
+ * Don't construct this.
+ */
+ private LoggerUtils() { }
+
+ /**
+ * Helper method to set the status and log message together.
+ *
+ * @param context Context to set the status with
+ * @param logger Logger to write to
+ * @param level Level of logging
+ * @param message Message to
+ */
+ public static void setStatusAndLog(
+ TaskAttemptContext context, Logger logger, Level level,
+ String message) {
+ try {
+ context.setStatus(message);
+ } catch (IOException e) {
+ throw new IllegalStateException("setStatusAndLog: Got IOException", e);
+ }
+ if (logger.isEnabledFor(level)) {
+ logger.log(level, message);
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java Wed Oct 17 04:33:16 2012
@@ -21,16 +21,19 @@ package org.apache.giraph.utils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/** Functions for waiting on some events to happen while reporting progress */
public class ProgressableUtils {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(ProgressableUtils.class);
- /** Msecs to refresh the progress meter */
- private static final int MSEC_PERIOD = 10000;
+ /** Msecs to refresh the progress meter (one minute) */
+ private static final int MSEC_PERIOD = 60 * 1000;
/** Do not instantiate. */
private ProgressableUtils() { }
@@ -39,7 +42,7 @@ public class ProgressableUtils {
* Wait for executor tasks to terminate, while periodically reporting
* progress.
*
- * @param executor Executor which we are waiting for
+ * @param executor Executor which we are waiting for
* @param progressable Progressable for reporting progress (Job context)
*/
public static void awaitExecutorTermination(ExecutorService executor,
@@ -85,4 +88,73 @@ public class ProgressableUtils {
remainingWaitMsecs = Math.max(0, remainingWaitMsecs - currentWaitMsecs);
}
}
+
+
+ /**
+ * Wait for the result of the future to be ready, while periodically
+ * reporting progress.
+ *
+ * @param <T> Type of the return value of the future
+ * @param future Future
+ * @param progressable Progressable for reporting progress (Job context)
+ * @return Computed result of the future.
+ */
+ public static <T> T getFutureResult(Future<T> future,
+ Progressable progressable) {
+ while (!future.isDone()) {
+ tryGetFutureResult(future, progressable, MSEC_PERIOD);
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("get: " +
+ "InterruptedException occurred while waiting for future result", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("get: " +
+ "ExecutionException occurred while waiting for future result", e);
+ }
+ }
+
+ /**
+ * Wait maximum given number of milliseconds for result to become available,
+ * while periodically reporting progress.
+ *
+ * @param <T> Type of the return value of the future
+ * @param future Future
+ * @param progressable Progressable for reporting progress (Job context)
+ * @param msecs Number of milliseconds to wait
+ * @return Future result
+ */
+ public static <T> T tryGetFutureResult(
+ Future<T> future, Progressable progressable, int msecs) {
+ long maxMsecs = System.currentTimeMillis() + msecs;
+ int curMsecTimeout;
+ while (true) {
+ curMsecTimeout = Math.min(msecs, MSEC_PERIOD);
+ try {
+ future.get(curMsecTimeout, TimeUnit.MILLISECONDS);
+ if (future.isDone()) {
+ return future.get();
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("tryGet: " +
+ "InterruptedException occurred while waiting for future result",
+ e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("tryGet: " +
+ "ExecutionException occurred while waiting for future result", e);
+ } catch (TimeoutException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tryGetFutureResult: Timeout occurred");
+ }
+ }
+
+ progressable.progress();
+ if (System.currentTimeMillis() >= maxMsecs) {
+ return null;
+ }
+ msecs = Math.max(0, msecs - curMsecTimeout);
+ }
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/Time.java Wed Oct 17 04:33:16 2012
@@ -33,10 +33,16 @@ public interface Time {
long NS_PER_MS = US_PER_MS * NS_PER_US;
/** Milliseconds per second */
long MS_PER_SECOND = 1000;
+ /** Milliseconds per second (as float) */
+ float MS_PER_SECOND_AS_FLOAT = MS_PER_SECOND * 1f;
/** Microseconds per second */
long US_PER_SECOND = US_PER_MS * MS_PER_SECOND;
+ /** Microseconds per second (as float) */
+ float US_PER_SECOND_AS_FLOAT = US_PER_SECOND * 1f;
/** Nanoseconds per second */
long NS_PER_SECOND = NS_PER_US * US_PER_SECOND;
+ /** Nanoseconds per second (as float) */
+ float NS_PER_SECOND_AS_FLOAT = NS_PER_SECOND * 1f;
/** Seconds per hour */
long SECONDS_PER_HOUR = 60 * 60;
/** Seconds per day */
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java Wed Oct 17 04:33:16 2012
@@ -37,7 +37,8 @@ import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper provides only atomic operations. ZooKeeperExt provides additional
- * non-atomic operations that are useful.
+ * non-atomic operations that are useful. All methods of this class should
+ * be thread-safe.
*/
public class ZooKeeperExt extends ZooKeeper {
/** Internal logger */
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/BspCase.java Wed Oct 17 04:33:16 2012
@@ -81,14 +81,14 @@ public class BspCase implements Watcher
// Allow this test to be run on a real Hadoop setup
if (runningInDistributedMode()) {
- System.out.println("setup: Sending job to job tracker " +
+ System.out.println("setupConfiguration: Sending job to job tracker " +
jobTracker + " with jar path " + getJarLocation()
+ " for " + getName());
conf.set("mapred.job.tracker", jobTracker);
conf.setWorkerConfiguration(getNumWorkers(), getNumWorkers(), 100.0f);
}
else {
- System.out.println("setup: Using local job runner with " +
+ System.out.println("setupConfiguration: Using local job runner with " +
"location " + getJarLocation() + " for " + getName());
conf.setWorkerConfiguration(1, 1, 100.0f);
// Single node testing
@@ -341,7 +341,7 @@ public class BspCase implements Watcher
@Before
public void setUp() {
if (runningInDistributedMode()) {
- System.out.println("Setting tasks to 3 for " + getName() +
+ System.out.println("setUp: Setting tasks to 3 for " + getName() +
" since JobTracker exists...");
numWorkers = 3;
}
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestBspBasic.java Wed Oct 17 04:33:16 2012
@@ -117,15 +117,11 @@ public class TestBspBasic extends BspCas
GiraphJob job = prepareJob(getCallingMethodName(),
SimpleSuperstepVertex.class,
SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
- GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
- new GraphState<LongWritable, IntWritable,
- FloatWritable, IntWritable>();
ImmutableClassesGiraphConfiguration configuration =
new ImmutableClassesGiraphConfiguration(job.getConfiguration());
Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
configuration.createVertex();
- System.out.println("testInstantiateVertex: Got vertex " + vertex +
- ", graphState" + gs);
+ System.out.println("testInstantiateVertex: Got vertex " + vertex);
VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
inputFormat = configuration.createVertexInputFormat();
/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
@@ -358,38 +354,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
}
/**
- * Run a sample BSP job locally and test PageRank.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspPageRank()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphJob job = prepareJob(getCallingMethodName(),
- SimplePageRankVertex.class, SimplePageRankVertexInputFormat.class);
- job.getConfiguration().setWorkerContextClass(
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
- job.getConfiguration().setMasterComputeClass(
- SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
- assertTrue(job.run(true));
- if (!runningInDistributedMode()) {
- double maxPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
- double minPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
- long numVertices =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
- System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
- " minPageRank=" + minPageRank + " numVertices=" + numVertices);
- assertEquals(34.03, maxPageRank, 0.001);
- assertEquals(0.03, minPageRank, 0.00001);
- assertEquals(5l, numVertices);
- }
- }
-
- /**
* Run a sample BSP job locally and test shortest paths.
*
* @throws IOException
Added: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java (added)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestPageRank.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph;
+
+import java.io.IOException;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.partition.HashMasterPartitioner;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank (with and without multithreading)
+ */
+public class TestPageRank extends BspCase {
+
+ /**
+ * Constructor
+ */
+ public TestPageRank() {
+ super(TestPageRank.class.getName());
+ }
+
+ @Test
+ public void testBspPageRankSingleCompute()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ testPageRank(1);
+ }
+
+
+ @Test
+ public void testPageRankTenThreadsCompute()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ testPageRank(10);
+ }
+
+ /**
+ * Generic page rank test
+ *
+ * @param numComputeThreads Number of compute threads to use
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ private void testPageRank(int numComputeThreads)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ SimplePageRankVertex.class, SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+ job.getConfiguration().setWorkerContextClass(
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+ job.getConfiguration().setMasterComputeClass(
+ SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+ job.getConfiguration().setNumComputeThreads(numComputeThreads);
+ // Set enough partitions to generate randomness on the compute side
+ if (numComputeThreads != 1) {
+ job.getConfiguration().setInt(
+ HashMasterPartitioner.USER_PARTITION_COUNT,
+ numComputeThreads * 5);
+ }
+ assertTrue(job.run(true));
+ if (!runningInDistributedMode()) {
+ double maxPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+ double minPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+ long numVertices =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+ System.out.println(getCallingMethodName() + ": maxPageRank=" +
+ maxPageRank + " minPageRank=" +
+ minPageRank + " numVertices=" + numVertices + ", " +
+ " numComputeThreads=" + numComputeThreads);
+ assertEquals(34.03, maxPageRank, 0.001);
+ assertEquals(0.03, minPageRank, 0.00001);
+ assertEquals(5l, numVertices);
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java Wed Oct 17 04:33:16 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.utils;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.GraphState;
@@ -53,13 +54,14 @@ public class MockUtils {
private final GraphState<I, V, E, M> graphState;
private final Mapper.Context context;
private final Configuration conf;
- private final WorkerClientServer communications;
+ private final WorkerClientRequestProcessor workerClientRequestProcessor;
public MockedEnvironment() {
graphState = Mockito.mock(GraphState.class);
context = Mockito.mock(Mapper.Context.class);
conf = Mockito.mock(Configuration.class);
- communications = Mockito.mock(WorkerClientServer.class);
+ workerClientRequestProcessor =
+ Mockito.mock(WorkerClientRequestProcessor.class);
}
/** the injected graph state */
@@ -78,19 +80,19 @@ public class MockUtils {
}
/** the injected worker communications */
- public WorkerClientServer getCommunications() {
- return communications;
+ public WorkerClientRequestProcessor getWorkerClientRequestProcessor() {
+ return workerClientRequestProcessor;
}
/** assert that the test vertex message has been sent to a particular vertex */
public void verifyMessageSent(I targetVertexId, M message) {
- Mockito.verify(communications).sendMessageRequest(targetVertexId,
- message);
+ Mockito.verify(workerClientRequestProcessor).sendMessageRequest
+ (targetVertexId, message);
}
/** assert that the test vertex has sent no message to a particular vertex */
public void verifyNoMessageSent() {
- Mockito.verifyZeroInteractions(communications);
+ Mockito.verifyZeroInteractions(workerClientRequestProcessor);
}
}
@@ -124,8 +126,8 @@ public class MockUtils {
.thenReturn(env.getContext());
Mockito.when(env.getContext().getConfiguration())
.thenReturn(env.getConfiguration());
- Mockito.when(env.getGraphState().getWorkerCommunications())
- .thenReturn(env.getCommunications());
+ Mockito.when(env.getGraphState().getWorkerClientRequestProcessor())
+ .thenReturn(env.getWorkerClientRequestProcessor());
ReflectionUtils.setField(vertex, "id", vertexId);
ReflectionUtils.setField(vertex, "value", vertexValue);