You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2011/12/05 23:02:15 UTC
svn commit: r1210662 - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: claudio
Date: Mon Dec 5 22:02:15 2011
New Revision: 1210662
URL: http://svn.apache.org/viewvc?rev=1210662&view=rev
Log:
GIRAPH-10: Aggregators are not exported.
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Dec 5 22:02:15 2011
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-10: Aggregators are not exported. (claudio)
+
GIRAPH-100: GIRAPH-100 - Data input sampling and testing
improvements. (aching)
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Mon Dec 5 22:02:15 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.graph.Aggregator;
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * This is a simple example for an aggregator writer. After each superstep
+ * the writer will persist the aggregator values to disk, by use of the
+ * Writable interface. The file will be created on the current working
+ * directory.
+ */
+public class SimpleAggregatorWriter implements AggregatorWriter {
+ /** the name of the file we wrote to */
+ public static String filename;
+ private FSDataOutputStream output;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void initialize(Context context, long applicationAttempt)
+ throws IOException {
+ filename = "aggregatedValues_"+applicationAttempt;
+ Path p = new Path(filename);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ output = fs.create(p, true);
+ }
+
+ @Override
+ public void writeAggregator(Map<String, Aggregator<Writable>> map,
+ long superstep) throws IOException {
+
+ for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
+ aggregator.getValue().getAggregatedValue().write(output);
+ }
+ output.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.close();
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Mon Dec 5 22:02:15 2011
@@ -45,7 +45,9 @@ import java.util.Map;
* Demonstrates the basic Pregel PageRank implementation.
*/
public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
- /** Logger */
+ /** Number of supersteps for this test */
+ public static final int MAX_SUPERSTEPS = 30;
+ /** Logger */
private static final Logger LOG =
Logger.getLogger(SimplePageRankVertex.class);
@@ -70,7 +72,7 @@ public class SimplePageRankVertex extend
" min=" + minAggreg.getAggregatedValue());
}
- if (getSuperstep() < 30) {
+ if (getSuperstep() < MAX_SUPERSTEPS) {
long edges = getNumOutEdges();
sendMsgToAllEdges(
new DoubleWritable(getVertexValue().get() / edges));
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Mon Dec 5 22:02:15 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * An AggregatorWriter is used to export Aggregators during or at the end of
+ * each computation. It runs on the master and it's called at the end of each
+ * superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
+ * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
+ * superstep value to signal the end of computation.
+ */
+public interface AggregatorWriter {
+ /** Signal for last superstep */
+ public static final int LAST_SUPERSTEP = -1;
+
+ /**
+ * The method is called at the initialization of the AggregatorWriter.
+ * More precisely, the aggregatorWriter is initialized each time a new
+ * master is elected.
+ *
+ * @param context Mapper Context where the master is running on
+ * @param applicationAttempt ID of the applicationAttempt, used to
+ * disambiguate aggregator writes for different attempts
+ * @throws IOException
+ */
+ @SuppressWarnings("rawtypes")
+ void initialize(Context context, long applicationAttempt) throws IOException;
+
+ /**
+ * The method is called at the end of each superstep. The user might decide
+ * whether to write the aggregators values for the current superstep. For
+ * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
+ *
+ * @param map Map of aggregators to write
+ * @param superstep Current superstep
+ * @throws IOException
+ */
+ void writeAggregator(
+ Map<String, Aggregator<Writable>> aggregatorMap,
+ long superstep) throws IOException;
+
+ /**
+ * The method is called at the end of a successful computation. The method
+ * is not called when the job fails and a new master is elected. For this
+ * reason it's advised to flush data at the end of
+ * {@link AggregatorWriter#writeAggregator(Map, long)}.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Mon Dec 5 22:02:15 2011
@@ -125,9 +125,10 @@ public class BspServiceMaster<
/** All the partition stats from the last superstep */
private final List<PartitionStats> allPartitionStatsList =
new ArrayList<PartitionStats>();
-
/** Counter group name for the Giraph statistics */
public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
+ /** Aggregator writer */
+ public AggregatorWriter aggregatorWriter;
public BspServiceMaster(
String serverPortList,
@@ -736,6 +737,15 @@ public class BspServiceMaster<
currentMasterTaskPartitionCounter.increment(
getTaskPartition() -
currentMasterTaskPartitionCounter.getValue());
+ aggregatorWriter =
+ BspUtils.createAggregatorWriter(getConfiguration());
+ try {
+ aggregatorWriter.initialize(getContext(),
+ getApplicationAttempt());
+ } catch (IOException e) {
+ throw new IllegalStateException("becomeMaster: " +
+ "Couldn't initialize aggregatorWriter", e);
+ }
LOG.info("becomeMaster: I am now the master!");
isMaster = true;
return isMaster;
@@ -823,7 +833,7 @@ public class BspServiceMaster<
/**
* Get the aggregator values for a particular superstep,
- * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
+ * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
*
* @param superstep superstep to check
*/
@@ -1500,13 +1510,25 @@ public class BspServiceMaster<
if (getSuperstep() > 0) {
superstepCounter.increment(1);
}
+ SuperstepState superstepState;
if ((globalStats.getFinishedVertexCount() ==
globalStats.getVertexCount()) &&
globalStats.getMessageCount() == 0) {
- return SuperstepState.ALL_SUPERSTEPS_DONE;
+ superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
} else {
- return SuperstepState.THIS_SUPERSTEP_DONE;
+ superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
+ }
+ try {
+ aggregatorWriter.writeAggregator(getAggregatorMap(),
+ (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
+ AggregatorWriter.LAST_SUPERSTEP : getSuperstep());
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "coordinateSuperstep: IOException while " +
+ "writing aggregators data", e);
}
+
+ return superstepState;
}
/**
@@ -1634,6 +1656,7 @@ public class BspServiceMaster<
success + " since this job succeeded ");
}
}
+ aggregatorWriter.close();
}
try {
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Mon Dec 5 22:02:15 2011
@@ -151,6 +151,32 @@ public class BspUtils {
getVertexOutputFormatClass(conf);
return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
}
+
+ /**
+ * Get the user's subclassed {@link AggregatorWriter}.
+ *
+ * @param conf Configuration to check
+ * @return User's aggregator writer class
+ */
+ public static Class<? extends AggregatorWriter>
+ getAggregatorWriterClass(Configuration conf) {
+ return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+ TextAggregatorWriter.class,
+ AggregatorWriter.class);
+ }
+
+ /**
+ * Create a user aggregator output format class
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user aggregator writer class
+ */
+ public static AggregatorWriter
+ createAggregatorWriter(Configuration conf) {
+ Class<? extends AggregatorWriter> aggregatorWriterClass =
+ getAggregatorWriterClass(conf);
+ return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
+ }
/**
* Get the user's subclassed {@link VertexCombiner}.
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Mon Dec 5 22:02:15 2011
@@ -61,7 +61,11 @@ public class GiraphJob extends Job {
/** Message value class */
public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
/** Worker context class */
- public static final String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+ public static final String WORKER_CONTEXT_CLASS =
+ "giraph.workerContextClass";
+ /** AggregatorWriter class - optional */
+ public static final String AGGREGATOR_WRITER_CLASS =
+ "giraph.aggregatorWriterClass";
/**
* Minimum number of simultaneous workers before this job can run (int)
@@ -447,7 +451,20 @@ public class GiraphJob extends Job {
workerContextClass,
WorkerContext.class);
}
-
+
+ /**
+ * Set the aggregator writer class (optional)
+ *
+ * @param aggregatorWriterClass Determines how the aggregators are
+ * written to file at the end of the job
+ */
+ final public void setAggregatorWriterClass(
+ Class<?> aggregatorWriterClass) {
+ getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
+ aggregatorWriterClass,
+ AggregatorWriter.class);
+ }
+
/**
* Set worker configuration for determining what is required for
* a superstep.
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1210662&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Mon Dec 5 22:02:15 2011
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+
+/**
+ * Default implementation of {@link AggregatorWriter}. Each line consists of
+ * text and contains the aggregator name, the aggregator value and the
+ * aggregator class.
+ */
+public class TextAggregatorWriter
+ implements AggregatorWriter {
+ /** The filename of the outputfile */
+ public static final String FILENAME =
+ "giraph.textAggregatorWriter.filename";
+ /** The frequency of writing:
+ * - NEVER: never write, files aren't created at all
+ * - AT_THE_END: aggregators are written only when the computation is over
+ * - int: i.e. 1 is every superstep, 2 every two supersteps and so on
+ */
+ public static final String FREQUENCY =
+ "giraph.textAggregatorWriter.frequency";
+ private static final String DEFAULT_FILENAME = "aggregatorValues";
+ /** Signal for "never write" frequency */
+ public static final int NEVER = 0;
+ /** Signal for "write only the final values" frequency */
+ public static final int AT_THE_END = -1;
+ /** Handle to the outputfile */
+ protected FSDataOutputStream output;
+ private int frequency;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void initialize(Context context, long attempt) throws IOException {
+ Configuration conf = context.getConfiguration();
+ frequency = conf.getInt(FREQUENCY, NEVER);
+ String filename = conf.get(FILENAME, DEFAULT_FILENAME);
+ if (frequency != NEVER) {
+ Path p = new Path(filename+"_"+attempt);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(p)) {
+ throw new RuntimeException("aggregatorWriter file already" +
+ " exists: " + p.getName());
+ }
+ output = fs.create(p);
+ }
+ }
+
+ @Override
+ final public void writeAggregator(
+ Map<String, Aggregator<Writable>> aggregators,
+ long superstep) throws IOException {
+
+ if (shouldWrite(superstep)) {
+ for (Entry<String, Aggregator<Writable>> a:
+ aggregators.entrySet()) {
+ output.writeUTF(aggregatorToString(a.getKey(),
+ a.getValue(),
+ superstep));
+ }
+ output.flush();
+ }
+ }
+
+ /**
+ * Implements the way an aggregator is converted into a String.
+ * Override this if you want to implement your own text format.
+ *
+ * @param aggregatorName Name of the aggregator
+ * @param a Aggregator
+ * @param superstep Current superstep
+ * @return The String representation for the aggregator
+ */
+ protected String aggregatorToString(String aggregatorName,
+ Aggregator<Writable> a,
+ long superstep) {
+
+ return new StringBuilder("superstep=").append(superstep).append("\t")
+ .append(aggregatorName).append("=").append(a.getAggregatedValue())
+ .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+ .toString();
+ }
+
+ private boolean shouldWrite(long superstep) {
+ return ((frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
+ (frequency != NEVER && superstep % frequency == 0));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (output != null) {
+ output.close();
+ }
+ }
+}
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/BspCase.java Mon Dec 5 22:02:15 2011
@@ -226,10 +226,22 @@ public class BspCase extends TestCase im
public static void removeAndSetOutput(GiraphJob job,
Path outputPath)
throws IOException {
- FileSystem hdfs = FileSystem.get(job.getConfiguration());
- hdfs.delete(outputPath, true);
+ remove(job.getConfiguration(), outputPath);
FileOutputFormat.setOutputPath(job, outputPath);
}
+
+ /**
+ * Helper method to remove a path if it exists.
+ *
+ * @param conf Configutation
+ * @param path Path to remove
+ * @throws IOException
+ */
+ public static void remove(Configuration conf, Path path)
+ throws IOException {
+ FileSystem hdfs = FileSystem.get(conf);
+ hdfs.delete(path, true);
+ }
public static String getCallingMethodName() {
return Thread.currentThread().getStackTrace()[2].getMethodName();
Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1210662&r1=1210661&r2=1210662&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Mon Dec 5 22:02:15 2011
@@ -20,6 +20,8 @@ package org.apache.giraph;
import junit.framework.Test;
import junit.framework.TestSuite;
+
+import org.apache.giraph.examples.SimpleAggregatorWriter;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
@@ -38,8 +40,11 @@ import org.apache.giraph.graph.GraphStat
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.BasicVertex;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -331,4 +336,66 @@ public class TestBspBasic extends BspCas
assertTrue(fileStatus.getLen() == fileStatus2.getLen());
}
}
+
+ /**
+ * Run a sample BSP job locally and test PageRank with AggregatorWriter.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public void testBspPageRankWithAggregatorWriter()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ GiraphJob job = new GiraphJob(getCallingMethodName());
+ setupConfiguration(job);
+ job.setVertexClass(SimplePageRankVertex.class);
+ job.setWorkerContextClass(
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+ job.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+ job.setAggregatorWriterClass(SimpleAggregatorWriter.class);
+ Path outputPath = new Path("/tmp/" + getCallingMethodName());
+ removeAndSetOutput(job, outputPath);
+ assertTrue(job.run(true));
+ if (getJobTracker() == null) {
+ double maxPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMax;
+ double minPageRank =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalMin;
+ long numVertices =
+ SimplePageRankVertex.SimplePageRankVertexWorkerContext.finalSum;
+ System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
+ " minPageRank=" + minPageRank +
+ " numVertices=" + numVertices);
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataInputStream input =
+ fs.open(new Path(SimpleAggregatorWriter.filename));
+ int i, all;
+ for (i = 0; ; i++) {
+ all = 0;
+ try {
+ DoubleWritable max = new DoubleWritable();
+ max.readFields(input);
+ all++;
+ DoubleWritable min = new DoubleWritable();
+ min.readFields(input);
+ all++;
+ LongWritable sum = new LongWritable();
+ sum.readFields(input);
+ all++;
+ if (i > 0) {
+ assertTrue(max.get() == maxPageRank);
+ assertTrue(min.get() == minPageRank);
+ assertTrue(sum.get() == numVertices);
+ }
+ } catch (IOException e) {
+ break;
+ }
+ }
+ input.close();
+ // contained all supersteps
+ assertTrue(i == SimplePageRankVertex.MAX_SUPERSTEPS+1 && all == 0);
+ remove(new Configuration(),
+ new Path(SimpleAggregatorWriter.filename));
+ }
+ }
}