You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/06/19 19:20:41 UTC
git commit: updated refs/heads/trunk to 1eaddd1
Updated Branches:
refs/heads/trunk 0d358c9d5 -> 1eaddd183
GIRAPH-673: Input superstep should support aggregators like any
other superstep (Bingjing via aching)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1eaddd18
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1eaddd18
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1eaddd18
Branch: refs/heads/trunk
Commit: 1eaddd183259047476893a014fb24e44fc549531
Parents: 0d358c9
Author: Avery Ching <ac...@fb.com>
Authored: Wed Jun 19 10:04:28 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Wed Jun 19 10:04:42 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 3 +
.../java/org/apache/giraph/io/EdgeReader.java | 32 +++-
.../java/org/apache/giraph/io/VertexReader.java | 32 +++-
.../giraph/io/internal/WrappedEdgeReader.java | 7 +
.../giraph/io/internal/WrappedVertexReader.java | 7 +
.../apache/giraph/master/BspServiceMaster.java | 91 +++++++----
.../apache/giraph/worker/BspServiceWorker.java | 12 +-
.../giraph/worker/EdgeInputSplitsCallable.java | 9 ++
.../worker/VertexInputSplitsCallable.java | 7 +
.../examples/AggregatorsTestComputation.java | 154 +++++++++++++++++++
.../giraph/examples/GeneratedEdgeReader.java | 73 +++++++++
.../aggregators/TestAggregatorsHandling.java | 13 +-
12 files changed, 400 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 50d411a..76f7e6b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-673: Input superstep should support aggregators like any
+ other superstep (Bingjing via aching)
+
GIRAPH-686: DiskBackedPartitionStore does not saveVertex after edges
are loaded (claudio)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index 363a5e6..1bc48e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io;
import java.io.IOException;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.edge.Edge;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,7 +37,11 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
@SuppressWarnings("rawtypes")
public abstract class EdgeReader<I extends WritableComparable,
E extends Writable> extends DefaultImmutableClassesGiraphConfigurable<
- I, Writable, E> {
+ I, Writable, E> implements WorkerAggregatorUsage {
+
+ /** Aggregator usage for edge reader */
+ private WorkerAggregatorUsage workerAggregatorUsage;
+
/**
* Use the input split and context to setup reading the edges.
* Guaranteed to be called prior to any other function.
@@ -51,6 +56,21 @@ public abstract class EdgeReader<I extends WritableComparable,
throws IOException, InterruptedException;
/**
+ * Set aggregator usage. It provides the functionality
+ * of aggregation operation in reading an edge.
+ * It is invoked just after initialization.
+ * E.g.,
+ * edgeReader.initialize(inputSplit, context);
+ * edgeReader.setAggregator(aggregatorUsage);
+ * This method is only for use by the infrastructure.
+ *
+ * @param agg aggregator usage for edge reader
+ */
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ workerAggregatorUsage = agg;
+ }
+
+ /**
* Read the next edge.
*
* @return false iff there are no more edges
@@ -97,4 +117,14 @@ public abstract class EdgeReader<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ workerAggregatorUsage.aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return workerAggregatorUsage.<A>getAggregatedValue(name);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index b8b82af..94a4083 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -20,6 +20,7 @@ package org.apache.giraph.io;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -38,7 +39,11 @@ import java.io.IOException;
@SuppressWarnings("rawtypes")
public abstract class VertexReader<I extends WritableComparable,
V extends Writable, E extends Writable> extends
- DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+ DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements WorkerAggregatorUsage {
+ /** Aggregator usage for vertex reader */
+ private WorkerAggregatorUsage workerAggregatorUsage;
+
/**
* Use the input split and context to setup reading the vertices.
* Guaranteed to be called prior to any other function.
@@ -53,6 +58,21 @@ public abstract class VertexReader<I extends WritableComparable,
throws IOException, InterruptedException;
/**
+ * Set aggregator usage. It provides the functionality
+ * of aggregation operation in reading a vertex.
+ * It is invoked just after initialization.
+ * E.g.,
+ * vertexReader.initialize(inputSplit, context);
+ * vertexReader.setAggregator(aggregatorUsage);
+ * This method is only for use by the infrastructure.
+ *
+ * @param agg aggregator usage for vertex reader
+ */
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ workerAggregatorUsage = agg;
+ }
+
+ /**
*
* @return false iff there are no more vertices
* @throws IOException
@@ -88,4 +108,14 @@ public abstract class VertexReader<I extends WritableComparable,
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ workerAggregatorUsage.aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return workerAggregatorUsage.<A>getAggregatedValue(name);
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
index aae7a72..e3b3689 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.internal;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -70,6 +71,12 @@ public class WrappedEdgeReader<I extends WritableComparable,
}
@Override
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ // Set aggregator usage for edge reader
+ baseEdgeReader.setWorkerAggregatorUse(agg);
+ }
+
+ @Override
public boolean nextEdge() throws IOException, InterruptedException {
return baseEdgeReader.nextEdge();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
index 54adfec..bf0a212 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java
@@ -21,6 +21,7 @@ package org.apache.giraph.io.internal;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -71,6 +72,12 @@ public class WrappedVertexReader<I extends WritableComparable,
}
@Override
+ public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+ // Set aggregator usage for vertex reader
+ baseVertexReader.setWorkerAggregatorUse(agg);
+ }
+
+ @Override
public boolean nextVertex() throws IOException, InterruptedException {
return baseVertexReader.nextVertex();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index bd48116..0d266a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1425,6 +1425,58 @@ public class BspServiceMaster<I extends WritableComparable,
}
}
+ /**
+ * Initialize aggregator at the master side
+ * before vertex/edge loading.
+ * This methods cooperates with other code
+ * to enables aggregation usage at INPUT_SUPERSTEP
+ * Other codes are:
+ * BSPServiceWorker:
+ * aggregatorHandler.prepareSuperstep in
+ * setup
+ * set aggregator usage in vertexReader and
+ * edgeReader
+ *
+ * @throws InterruptedException
+ */
+ private void initializeAggregatorInputSuperstep()
+ throws InterruptedException {
+ aggregatorHandler.prepareSuperstep(masterClient);
+ prepareMasterCompute(getSuperstep());
+ try {
+ masterCompute.initialize();
+ } catch (InstantiationException e) {
+ LOG.fatal(
+ "initializeAggregatorInputSuperstep: Failed in instantiation", e);
+ throw new RuntimeException(
+ "initializeAggregatorInputSuperstep: Failed in instantiation", e);
+ } catch (IllegalAccessException e) {
+ LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
+ throw new RuntimeException(
+ "initializeAggregatorInputSuperstep: Failed in access", e);
+ }
+ aggregatorHandler.finishSuperstep(masterClient);
+ }
+
+ /**
+ * This is required before initialization
+ * and run of MasterCompute
+ *
+ * @param superstep superstep for which to run masterCompute
+ * @return Superstep classes set by masterCompute
+ */
+ private SuperstepClasses prepareMasterCompute(long superstep) {
+ GraphState graphState = new GraphState(superstep ,
+ GiraphStats.getInstance().getVertices().getValue(),
+ GiraphStats.getInstance().getEdges().getValue(),
+ getContext());
+ SuperstepClasses superstepClasses =
+ new SuperstepClasses(getConfiguration());
+ masterCompute.setGraphState(graphState);
+ masterCompute.setSuperstepClasses(superstepClasses);
+ return superstepClasses;
+ }
+
@Override
public SuperstepState coordinateSuperstep() throws
KeeperException, InterruptedException {
@@ -1495,6 +1547,9 @@ public class BspServiceMaster<I extends WritableComparable,
}
if (getSuperstep() == INPUT_SUPERSTEP) {
+ // Initialize aggregators before coordinating
+ // vertex loading and edge loading
+ initializeAggregatorInputSuperstep();
if (getConfiguration().hasVertexInputFormat()) {
coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
"Vertex");
@@ -1516,7 +1571,9 @@ public class BspServiceMaster<I extends WritableComparable,
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
aggregatorHandler.prepareSuperstep(masterClient);
- SuperstepClasses superstepClasses = runMasterCompute(getSuperstep());
+ SuperstepClasses superstepClasses =
+ prepareMasterCompute(getSuperstep() + 1);
+ doMasterCompute();
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
@@ -1569,39 +1626,13 @@ public class BspServiceMaster<I extends WritableComparable,
}
/**
- * Run the master.compute() class
- *
- * @param superstep superstep for which to run the master.compute()
- * @return Superstep classes set by Master compute
+ * This doMasterCompute is only called
+ * after masterCompute is initialized
*/
- private SuperstepClasses runMasterCompute(long superstep) {
- // The master.compute() should run logically before the workers, so
- // increase the superstep counter it uses by one
- GraphState graphState = new GraphState(superstep + 1,
- GiraphStats.getInstance().getVertices().getValue(),
- GiraphStats.getInstance().getEdges().getValue(),
- getContext());
- SuperstepClasses superstepClasses =
- new SuperstepClasses(getConfiguration());
- masterCompute.setGraphState(graphState);
- masterCompute.setSuperstepClasses(superstepClasses);
- if (superstep == INPUT_SUPERSTEP) {
- try {
- masterCompute.initialize();
- } catch (InstantiationException e) {
- LOG.fatal("runMasterCompute: Failed in instantiation", e);
- throw new RuntimeException(
- "runMasterCompute: Failed in instantiation", e);
- } catch (IllegalAccessException e) {
- LOG.fatal("runMasterCompute: Failed in access", e);
- throw new RuntimeException(
- "runMasterCompute: Failed in access", e);
- }
- }
+ private void doMasterCompute() {
GiraphTimerContext timerContext = masterComputeTimer.time();
masterCompute.compute();
timerContext.stop();
- return superstepClasses;
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index eb6d30d..342e2b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -480,11 +480,15 @@ public class BspServiceWorker<I extends WritableComparable,
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-/*if[HADOOP_NON_SECURE]
- workerClient.setup();
-else[HADOOP_NON_SECURE]*/
+ /*if[HADOOP_NON_SECURE]
+ workerClient.setup();
+ else[HADOOP_NON_SECURE]*/
workerClient.setup(getConfiguration().authenticate());
-/*end[HADOOP_NON_SECURE]*/
+ /*end[HADOOP_NON_SECURE]*/
+
+ // Initialize aggregator at worker side during setup.
+ // Do this just before vertex and edge loading.
+ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
VertexEdgeCount vertexEdgeCount;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 78cdd8e..c2c72c6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -59,6 +59,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
/** Class logger */
private static final Logger LOG = Logger.getLogger(
EdgeInputSplitsCallable.class);
+
+ /** Aggregator handler */
+ private final WorkerThreadAggregatorUsage aggregatorUsage;
/** Edge input format */
private final EdgeInputFormat<I, E> edgeInputFormat;
/** Input split max edges (-1 denotes all) */
@@ -95,6 +98,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
this.edgeInputFormat = edgeInputFormat;
inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
+ // Initialize aggregator usage.
+ this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
+ .newThreadAggregatorUsage();
edgeInputFilter = configuration.getEdgeInputFilter();
// Initialize Metrics
@@ -125,7 +131,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
edgeReader.setConf(
(ImmutableClassesGiraphConfiguration<I, Writable, E>)
configuration);
+
edgeReader.initialize(inputSplit, context);
+ // Set aggregator usage to edge reader
+ edgeReader.setWorkerAggregatorUse(aggregatorUsage);
long inputSplitEdgesLoaded = 0;
long inputSplitEdgesFiltered = 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 977e100..fb4fdf4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -129,7 +129,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
VertexReader<I, V, E> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, context);
vertexReader.setConf(configuration);
+
+ WorkerThreadAggregatorUsage aggregatorUsage =
+ this.bspServiceWorker
+ .getAggregatorHandler().newThreadAggregatorUsage();
+
vertexReader.initialize(inputSplit, context);
+ // Set aggregator usage to vertex reader
+ vertexReader.setWorkerAggregatorUse(aggregatorUsage);
long inputSplitVerticesLoaded = 0;
long inputSplitVerticesFiltered = 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
index db527f2..b054e9e 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -19,14 +19,29 @@
package org.apache.giraph.examples;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/** Computation which uses aggrergators. To be used for testing. */
public class AggregatorsTestComputation extends
@@ -37,6 +52,12 @@ public class AggregatorsTestComputation extends
private static final String REGULAR_AGG = "regular";
/** Name of persistent aggregator */
private static final String PERSISTENT_AGG = "persistent";
+ /** Name of input super step persistent aggregator */
+ private static final String INPUT_VERTEX_PERSISTENT_AGG
+ = "input_super_step_vertex_agg";
+ /** Name of input super step persistent aggregator */
+ private static final String INPUT_EDGE_PERSISTENT_AGG
+ = "input_super_step_edge_agg";
/** Name of master overwriting aggregator */
private static final String MASTER_WRITE_AGG = "master";
/** Value which master compute will use */
@@ -92,6 +113,14 @@ public class AggregatorsTestComputation extends
setAggregatedValue(MASTER_WRITE_AGG, myValue);
long nv = getTotalNumVertices();
+ if (superstep >= 0) {
+ assertEquals(100, ((LongWritable)
+ getAggregatedValue(INPUT_VERTEX_PERSISTENT_AGG)).get());
+ }
+ if (superstep >= 0) {
+ assertEquals(4500, ((LongWritable)
+ getAggregatedValue(INPUT_EDGE_PERSISTENT_AGG)).get());
+ }
if (superstep > 0) {
assertEquals(nv * (1L << (superstep - 1)),
((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
@@ -111,6 +140,10 @@ public class AggregatorsTestComputation extends
@Override
public void initialize() throws InstantiationException,
IllegalAccessException {
+ registerPersistentAggregator(
+ INPUT_VERTEX_PERSISTENT_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(
+ INPUT_EDGE_PERSISTENT_AGG, LongSumAggregator.class);
registerAggregator(REGULAR_AGG, LongSumAggregator.class);
registerPersistentAggregator(PERSISTENT_AGG,
LongSumAggregator.class);
@@ -134,4 +167,125 @@ public class AggregatorsTestComputation extends
", actual: " + actual);
}
}
+
+ /**
+ * Simple VertexReader
+ */
+ public static class SimpleVertexReader extends
+ GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleVertexReader.class);
+
+ @Override
+ public boolean nextVertex() {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable,
+ FloatWritable> getCurrentVertex() throws IOException {
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
+ getConf().createVertex();
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+ long targetVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+ edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+ new FloatWritable(edgeValue)));
+ vertex.initialize(vertexId, vertexValue, edges);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next vertex: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+ }
+ aggregate(INPUT_VERTEX_PERSISTENT_AGG,
+ new LongWritable((long) vertex.getValue().get()));
+ return vertex;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat
+ */
+ public static class SimpleVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ @Override
+ public VertexReader<LongWritable, DoubleWritable,
+ FloatWritable> createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new SimpleVertexReader();
+ }
+ }
+
+ /**
+ * Simple Edge Reader
+ */
+ public static class SimpleEdgeReader extends
+ GeneratedEdgeReader<LongWritable, FloatWritable> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(SimpleEdgeReader.class);
+
+ @Override
+ public boolean nextEdge() {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Edge<LongWritable, FloatWritable> getCurrentEdge()
+ throws IOException {
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ long targetVertexId = (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ Edge<LongWritable, FloatWritable> edge = EdgeFactory.create(
+ new LongWritable(targetVertexId), new FloatWritable(edgeValue));
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next edge: Return targetVertexId=" + targetVertexId +
+ ", edgeValue=" + edgeValue);
+ }
+ aggregate(INPUT_EDGE_PERSISTENT_AGG, new LongWritable((long) edge
+ .getValue().get()));
+ return edge;
+ }
+
+ @Override
+ public LongWritable getCurrentSourceId() throws IOException,
+ InterruptedException {
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ return vertexId;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat
+ */
+ public static class SimpleEdgeInputFormat extends
+ EdgeInputFormat<LongWritable, FloatWritable> {
+
+ @Override
+ public EdgeReader<LongWritable, FloatWritable> createEdgeReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new SimpleEdgeReader();
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+ throws IOException, InterruptedException {
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < minSplitCountHint; ++i) {
+ inputSplitList.add(new BspInputSplit(i, minSplitCountHint));
+ }
+ return inputSplitList;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
new file mode 100644
index 0000000..14da5f2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import java.io.IOException;
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.io.EdgeReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Used by GeneratedEdgeInputFormat
+ * to read some generated data
+ *
+ * @param <I> Vertex index value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class GeneratedEdgeReader<
+ I extends WritableComparable,
+ E extends Writable>
+ extends EdgeReader<I, E> {
+ /** Default edges produced by this reader */
+ public static final LongConfOption DEFAULT_READER_EDGES =
+ new LongConfOption("GeneratedEdgeReader.reader_edges", 10);
+ /** Records read so far */
+ protected long recordsRead = 0;
+ /** Total records to read (on this split alone) */
+ protected long totalRecords = 0;
+ /** The input split from initialize(). */
+ protected BspInputSplit inputSplit = null;
+
+ /**
+ * Default constructor for reflection.
+ */
+ public GeneratedEdgeReader() {
+ }
+
+ @Override
+ public final void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException {
+ totalRecords = DEFAULT_READER_EDGES.get(getConf());
+ this.inputSplit = (BspInputSplit) inputSplit;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public final float getProgress() throws IOException {
+ return recordsRead * 100.0f / totalRecords;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
index 6d22800..e2b611b 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java
@@ -25,7 +25,6 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.examples.AggregatorsTestComputation;
import org.apache.giraph.examples.SimpleCheckpoint;
-import org.apache.giraph.examples.SimplePageRankComputation;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.master.MasterAggregatorHandler;
import org.apache.hadoop.fs.Path;
@@ -77,7 +76,9 @@ public class TestAggregatorsHandling extends BspCase {
GiraphConfiguration conf = new GiraphConfiguration();
conf.setComputationClass(AggregatorsTestComputation.class);
conf.setVertexInputFormatClass(
- SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+ AggregatorsTestComputation.SimpleVertexInputFormat.class);
+ conf.setEdgeInputFormatClass(
+ AggregatorsTestComputation.SimpleEdgeInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
job.getConfiguration().setMasterComputeClass(
AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
@@ -159,7 +160,9 @@ public class TestAggregatorsHandling extends BspCase {
conf.setMasterComputeClass(
AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
conf.setVertexInputFormatClass(
- SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+ AggregatorsTestComputation.SimpleVertexInputFormat.class);
+ conf.setEdgeInputFormatClass(
+ AggregatorsTestComputation.SimpleEdgeInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
GiraphConfiguration configuration = job.getConfiguration();
@@ -178,7 +181,9 @@ public class TestAggregatorsHandling extends BspCase {
conf.setMasterComputeClass(
AggregatorsTestComputation.AggregatorsTestMasterCompute.class);
conf.setVertexInputFormatClass(
- SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+ AggregatorsTestComputation.SimpleVertexInputFormat.class);
+ conf.setEdgeInputFormatClass(
+ AggregatorsTestComputation.SimpleEdgeInputFormat.class);
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
conf, outputPath);
job.getConfiguration().setMasterComputeClass(