You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/07/25 01:37:45 UTC
svn commit: r1365352 [2/4] - in /giraph/trunk: ./
giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/
giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/
giraph-formats-contrib/src/test/java/org/apache...
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java Tue Jul 24 23:37:42 2012
@@ -18,11 +18,9 @@
package org.apache.giraph.examples;
+import org.apache.giraph.graph.EdgeListVertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.graph.EdgeListVertex;
-
-import java.util.Iterator;
/**
* User applications can subclass IdentityVertex, which
@@ -41,7 +39,7 @@ public abstract class IdentityVertex<I e
extends EdgeListVertex<I, V, E, M> {
@Override
- public void compute(Iterator<M> msgIterator) {
+ public void compute(Iterable<M> messages) {
voteToHalt();
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java Tue Jul 24 23:37:42 2012
@@ -18,10 +18,8 @@
package org.apache.giraph.examples;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.lib.TextVertexInputFormat;
import org.apache.hadoop.io.IntWritable;
@@ -32,6 +30,9 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
@@ -74,9 +75,9 @@ public class IntIntNullIntTextInputForma
}
@Override
- public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ public Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
getCurrentVertex() throws IOException, InterruptedException {
- BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ Vertex<IntWritable, IntWritable, NullWritable, IntWritable>
vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
IntWritable>createVertex(getContext().getConfiguration());
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java?rev=1365352&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java Tue Jul 24 23:37:42 2012
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link VertexCombiner} that finds the minimum {@link DoubleWritable}
+ */
+public class MinimumDoubleCombiner extends
+ VertexCombiner<LongWritable, DoubleWritable> {
+ @Override
+ public Iterable<DoubleWritable> combine(
+ LongWritable target,
+ Iterable<DoubleWritable> messages) throws IOException {
+ double minimum = Double.MAX_VALUE;
+ for (DoubleWritable message : messages) {
+ if (message.get() < minimum) {
+ minimum = message.get();
+ }
+ }
+ List<DoubleWritable> value = new ArrayList<DoubleWritable>();
+ value.add(new DoubleWritable(minimum));
+
+ return value;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Jul 24 23:37:42 2012
@@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.WorkerContext;
@@ -37,8 +38,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-import java.util.Iterator;
-
/**
* An example that simply uses its id, value, and edges to compute new data
* every iteration to verify that checkpoint restarting works. Fault injection
@@ -64,7 +63,7 @@ public class SimpleCheckpointVertex exte
private Configuration conf;
@Override
- public void compute(Iterator<FloatWritable> msgIterator) {
+ public void compute(Iterable<FloatWritable> messages) {
SimpleCheckpointVertexWorkerContext workerContext =
(SimpleCheckpointVertexWorkerContext) getWorkerContext();
@@ -76,7 +75,7 @@ public class SimpleCheckpointVertex exte
if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
(getContext().getTaskAttemptID().getId() == 0) &&
- (getVertexId().get() == FAULTING_VERTEX_ID)) {
+ (getId().get() == FAULTING_VERTEX_ID)) {
LOG.info("compute: Forced a fault on the first " +
"attempt of superstep " +
FAULTING_SUPERSTEP + " and vertex id " +
@@ -88,37 +87,34 @@ public class SimpleCheckpointVertex exte
return;
}
LOG.info("compute: " + sumAggregator);
- sumAggregator.aggregate(getVertexId().get());
+ sumAggregator.aggregate(getId().get());
LOG.info("compute: sum = " +
sumAggregator.getAggregatedValue().get() +
- " for vertex " + getVertexId());
+ " for vertex " + getId());
float msgValue = 0.0f;
- while (msgIterator.hasNext()) {
- float curMsgValue = msgIterator.next().get();
+ for (FloatWritable message : messages) {
+ float curMsgValue = message.get();
msgValue += curMsgValue;
LOG.info("compute: got msgValue = " + curMsgValue +
- " for vertex " + getVertexId() +
+ " for vertex " + getId() +
" on superstep " + getSuperstep());
}
- int vertexValue = getVertexValue().get();
- setVertexValue(new IntWritable(vertexValue + (int) msgValue));
- LOG.info("compute: vertex " + getVertexId() +
- " has value " + getVertexValue() +
+ int vertexValue = getValue().get();
+ setValue(new IntWritable(vertexValue + (int) msgValue));
+ LOG.info("compute: vertex " + getId() +
+ " has value " + getValue() +
" on superstep " + getSuperstep());
- for (Iterator<LongWritable> edges = getOutEdgesIterator();
- edges.hasNext();) {
- LongWritable targetVertexId = edges.next();
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
- LOG.info("compute: vertex " + getVertexId() +
- " sending edgeValue " + edgeValue +
+ for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+ (float) vertexValue);
+ LOG.info("compute: vertex " + getId() +
+ " sending edgeValue " + edge.getValue() +
" vertexValue " + vertexValue +
- " total " + (edgeValue.get() +
- (float) vertexValue) +
- " to vertex " + targetVertexId +
+ " total " + newEdgeValue +
+ " to vertex " + edge.getTargetVertexId() +
" on superstep " + getSuperstep());
- edgeValue.set(edgeValue.get() + (float) vertexValue);
- addEdge(targetVertexId, edgeValue);
- sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
+ addEdge(edge.getTargetVertexId(), newEdgeValue);
+ sendMessage(edge.getTargetVertexId(), newEdgeValue);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java Tue Jul 24 23:37:42 2012
@@ -18,15 +18,12 @@
package org.apache.giraph.examples;
-import java.util.Iterator;
-
+import org.apache.giraph.graph.EdgeListVertex;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import org.apache.giraph.graph.EdgeListVertex;
-
/**
* Test whether messages can go through a combiner.
*/
@@ -36,20 +33,20 @@ public class SimpleCombinerVertex extend
private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
@Override
- public void compute(Iterator<IntWritable> msgIterator) {
- if (getVertexId().equals(new LongWritable(2))) {
- sendMsg(new LongWritable(1), new IntWritable(101));
- sendMsg(new LongWritable(1), new IntWritable(102));
- sendMsg(new LongWritable(1), new IntWritable(103));
+ public void compute(Iterable<IntWritable> messages) {
+ if (getId().equals(new LongWritable(2))) {
+ sendMessage(new LongWritable(1), new IntWritable(101));
+ sendMessage(new LongWritable(1), new IntWritable(102));
+ sendMessage(new LongWritable(1), new IntWritable(103));
}
- if (!getVertexId().equals(new LongWritable(1))) {
+ if (!getId().equals(new LongWritable(1))) {
voteToHalt();
} else {
// Check the messages
int sum = 0;
int num = 0;
- while (msgIterator != null && msgIterator.hasNext()) {
- sum += msgIterator.next().get();
+ for (IntWritable message : messages) {
+ sum += message.get();
num++;
}
LOG.info("TestCombinerVertex: Received a sum of " + sum +
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java Tue Jul 24 23:37:42 2012
@@ -24,8 +24,6 @@ import org.apache.hadoop.io.FloatWritabl
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import java.util.Iterator;
-
/**
* Vertex to allow unit testing of failure detection
*/
@@ -37,18 +35,18 @@ public class SimpleFailVertex extends Ed
private static long SUPERSTEP = 0;
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
+ public void compute(Iterable<DoubleWritable> messages) {
if (getSuperstep() >= 1) {
double sum = 0;
- while (msgIterator.hasNext()) {
- sum += msgIterator.next().get();
+ for (DoubleWritable message : messages) {
+ sum += message.get();
}
DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
- setVertexValue(vertexValue);
+ new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+ setValue(vertexValue);
if (getSuperstep() < 30) {
if (getSuperstep() == 20) {
- if (getVertexId().get() == 10L) {
+ if (getId().get() == 10L) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
@@ -59,9 +57,9 @@ public class SimpleFailVertex extends Ed
return;
}
}
- long edges = getNumOutEdges();
- sendMsgToAllEdges(
- new DoubleWritable(getVertexValue().get() / edges));
+ long edges = getNumEdges();
+ sendMessageToAllEdges(
+ new DoubleWritable(getValue().get() / edges));
} else {
voteToHalt();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java Tue Jul 24 23:37:42 2012
@@ -27,6 +27,6 @@ import org.apache.hadoop.io.LongWritable
* lib.LongDoubleDoubleAdjacencyListVertexInputFormat
*/
-public class SimpleLongDoubleDoubleDoubleIdentityVertex extends
+public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends
IdentityVertex<LongWritable, DoubleWritable,
DoubleWritable, DoubleWritable> { }
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Tue Jul 24 23:37:42 2012
@@ -28,7 +28,6 @@ import org.apache.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
/**
* Demonstrates a computation with a centralized part implemented via a
@@ -42,13 +41,13 @@ public class SimpleMasterComputeVertex e
Logger.getLogger(SimpleMasterComputeVertex.class);
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
+ public void compute(Iterable<DoubleWritable> messages) {
DoubleOverwriteAggregator agg =
(DoubleOverwriteAggregator) getAggregator(SMC_AGG);
- double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get();
+ double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
double newValue = agg.getAggregatedValue().get();
double newSum = oldSum + newValue;
- setVertexValue(new DoubleWritable(newSum));
+ setValue(new DoubleWritable(newSum));
SimpleMasterComputeWorkerContext workerContext =
(SimpleMasterComputeWorkerContext) getWorkerContext();
workerContext.setFinalSum(newSum);
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Tue Jul 24 23:37:42 2012
@@ -18,15 +18,12 @@
package org.apache.giraph.examples;
-import java.util.Iterator;
-
+import org.apache.giraph.graph.EdgeListVertex;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import org.apache.giraph.graph.EdgeListVertex;
-
/**
* Test whether messages can be sent and received by vertices.
*/
@@ -35,19 +32,19 @@ public class SimpleMsgVertex extends
/** Class logger */
private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
@Override
- public void compute(Iterator<IntWritable> msgIterator) {
- if (getVertexId().equals(new LongWritable(2))) {
- sendMsg(new LongWritable(1), new IntWritable(101));
- sendMsg(new LongWritable(1), new IntWritable(102));
- sendMsg(new LongWritable(1), new IntWritable(103));
+ public void compute(Iterable<IntWritable> messages) {
+ if (getId().equals(new LongWritable(2))) {
+ sendMessage(new LongWritable(1), new IntWritable(101));
+ sendMessage(new LongWritable(1), new IntWritable(102));
+ sendMessage(new LongWritable(1), new IntWritable(103));
}
- if (!getVertexId().equals(new LongWritable(1))) {
+ if (!getId().equals(new LongWritable(1))) {
voteToHalt();
} else {
/* Check the messages */
int sum = 0;
- while (msgIterator != null && msgIterator.hasNext()) {
- sum += msgIterator.next().get();
+ for (IntWritable message : messages) {
+ sum += message.get();
}
LOG.info("TestMsgVertex: Received a sum of " + sum +
" (will stop on 306)");
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Tue Jul 24 23:37:42 2012
@@ -18,18 +18,16 @@
package org.apache.giraph.examples;
-import java.io.IOException;
-import java.util.Iterator;
-
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import org.apache.giraph.graph.BasicVertex;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.WorkerContext;
+import java.io.IOException;
/**
* Vertex to allow unit testing of graph mutations.
@@ -55,7 +53,7 @@ public class SimpleMutateGraphVertex ext
}
@Override
- public void compute(Iterator<DoubleWritable> msgIterator)
+ public void compute(Iterable<DoubleWritable> messages)
throws IOException {
SimpleMutateGraphVertexWorkerContext workerContext =
(SimpleMutateGraphVertexWorkerContext) getWorkerContext();
@@ -65,77 +63,77 @@ public class SimpleMutateGraphVertex ext
// Send messages to vertices that are sure not to exist
// (creating them)
LongWritable destVertexId =
- new LongWritable(rangeVertexIdStart(1) + getVertexId().get());
- sendMsg(destVertexId, new DoubleWritable(0.0));
+ new LongWritable(rangeVertexIdStart(1) + getId().get());
+ sendMessage(destVertexId, new DoubleWritable(0.0));
} else if (getSuperstep() == 2) {
LOG.debug("Reached superstep " + getSuperstep());
} else if (getSuperstep() == 3) {
long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getNumVertices()) {
+ if (vertexCount * 2 != getTotalNumVertices()) {
throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
+ "Impossible to have " + getTotalNumVertices() +
" vertices when should have " + vertexCount * 2 +
" on superstep " + getSuperstep());
}
long edgeCount = workerContext.getEdgeCount();
- if (edgeCount != getNumEdges()) {
+ if (edgeCount != getTotalNumEdges()) {
throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
+ "Impossible to have " + getTotalNumEdges() +
" edges when should have " + edgeCount +
" on superstep " + getSuperstep());
}
// Create vertices that are sure not to exist (doubling vertices)
LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
- BasicVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> vertex =
- instantiateVertex(vertexIndex, null, null, null);
+ new LongWritable(rangeVertexIdStart(3) + getId().get());
+ Vertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> vertex =
+ instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null);
addVertexRequest(vertex);
// Add edges to those remote vertices as well
addEdgeRequest(vertexIndex,
new Edge<LongWritable, FloatWritable>(
- getVertexId(), new FloatWritable(0.0f)));
+ getId(), new FloatWritable(0.0f)));
} else if (getSuperstep() == 4) {
LOG.debug("Reached superstep " + getSuperstep());
} else if (getSuperstep() == 5) {
long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getNumVertices()) {
+ if (vertexCount * 2 != getTotalNumVertices()) {
throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
+ "Impossible to have " + getTotalNumVertices() +
" when should have " + vertexCount * 2 +
" on superstep " + getSuperstep());
}
long edgeCount = workerContext.getEdgeCount();
- if (edgeCount + vertexCount != getNumEdges()) {
+ if (edgeCount + vertexCount != getTotalNumEdges()) {
throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
+ "Impossible to have " + getTotalNumEdges() +
" edges when should have " + edgeCount + vertexCount +
" on superstep " + getSuperstep());
}
// Remove the edges created in superstep 3
LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getVertexId().get());
+ new LongWritable(rangeVertexIdStart(3) + getId().get());
workerContext.increaseEdgesRemoved();
- removeEdgeRequest(vertexIndex, getVertexId());
+ removeEdgeRequest(vertexIndex, getId());
} else if (getSuperstep() == 6) {
// Remove all the vertices created in superstep 3
- if (getVertexId().compareTo(
+ if (getId().compareTo(
new LongWritable(rangeVertexIdStart(3))) >= 0) {
- removeVertexRequest(getVertexId());
+ removeVertexRequest(getId());
}
} else if (getSuperstep() == 7) {
long origEdgeCount = workerContext.getOrigEdgeCount();
- if (origEdgeCount != getNumEdges()) {
+ if (origEdgeCount != getTotalNumEdges()) {
throw new IllegalStateException(
- "Impossible to have " + getNumEdges() +
+ "Impossible to have " + getTotalNumEdges() +
" edges when should have " + origEdgeCount +
" on superstep " + getSuperstep());
}
} else if (getSuperstep() == 8) {
long vertexCount = workerContext.getVertexCount();
- if (vertexCount / 2 != getNumVertices()) {
+ if (vertexCount / 2 != getTotalNumVertices()) {
throw new IllegalStateException(
- "Impossible to have " + getNumVertices() +
+ "Impossible to have " + getTotalNumVertices() +
" vertices when should have " + vertexCount / 2 +
" on superstep " + getSuperstep());
}
@@ -170,8 +168,8 @@ public class SimpleMutateGraphVertex ext
@Override
public void postSuperstep() {
- vertexCount = getNumVertices();
- edgeCount = getNumEdges();
+ vertexCount = getTotalNumVertices();
+ edgeCount = getTotalNumEdges();
if (getSuperstep() == 1) {
origEdgeCount = edgeCount;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,12 @@
package org.apache.giraph.examples;
-import com.google.common.collect.Maps;
-
import org.apache.giraph.aggregators.DoubleMaxAggregator;
import org.apache.giraph.aggregators.DoubleMinAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.graph.WorkerContext;
@@ -40,8 +38,9 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -58,30 +57,31 @@ public class SimplePageRankVertex extend
Logger.getLogger(SimplePageRankVertex.class);
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
+ public void compute(Iterable<DoubleWritable> messages) {
LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
DoubleMinAggregator minAggreg = (DoubleMinAggregator) getAggregator("min");
DoubleMaxAggregator maxAggreg = (DoubleMaxAggregator) getAggregator("max");
+
if (getSuperstep() >= 1) {
double sum = 0;
- while (msgIterator.hasNext()) {
- sum += msgIterator.next().get();
+ for (DoubleWritable message : messages) {
+ sum += message.get();
}
DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum);
- setVertexValue(vertexValue);
+ new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+ setValue(vertexValue);
maxAggreg.aggregate(vertexValue);
minAggreg.aggregate(vertexValue);
sumAggreg.aggregate(1L);
- LOG.info(getVertexId() + ": PageRank=" + vertexValue +
+ LOG.info(getId() + ": PageRank=" + vertexValue +
" max=" + maxAggreg.getAggregatedValue() +
" min=" + minAggreg.getAggregatedValue());
}
if (getSuperstep() < MAX_SUPERSTEPS) {
- long edges = getNumOutEdges();
- sendMsgToAllEdges(
- new DoubleWritable(getVertexValue().get() / edges));
+ long edges = getNumEdges();
+ sendMessageToAllEdges(
+ new DoubleWritable(getValue().get() / edges));
} else {
voteToHalt();
}
@@ -150,11 +150,11 @@ public class SimplePageRankVertex extend
if (getSuperstep() >= 3) {
LOG.info("aggregatedNumVertices=" +
sumAggreg.getAggregatedValue() +
- " NumVertices=" + getNumVertices());
- if (sumAggreg.getAggregatedValue().get() != getNumVertices()) {
+ " NumVertices=" + getTotalNumVertices());
+ if (sumAggreg.getAggregatedValue().get() != getTotalNumVertices()) {
throw new RuntimeException("wrong value of SumAggreg: " +
sumAggreg.getAggregatedValue() + ", should be: " +
- getNumVertices());
+ getTotalNumVertices());
}
DoubleWritable maxPagerank = maxAggreg.getAggregatedValue();
LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
@@ -194,27 +194,27 @@ public class SimplePageRankVertex extend
}
@Override
- public BasicVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable>
+ public Vertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable>
getCurrentVertex() throws IOException {
- BasicVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
vertex = BspUtils.createVertex(configuration);
LongWritable vertexId = new LongWritable(
(inputSplit.getSplitIndex() * totalRecords) + recordsRead);
DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
- long destVertexId =
+ long targetVertexId =
(vertexId.get() + 1) %
(inputSplit.getNumSplits() * totalRecords);
float edgeValue = vertexId.get() * 100f;
Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
- edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue));
+ edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue));
vertex.initialize(vertexId, vertexValue, edges, null);
++recordsRead;
if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
- ", vertexValue=" + vertex.getVertexValue() +
- ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue);
+ LOG.info("next: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
}
return vertex;
}
@@ -252,11 +252,11 @@ public class SimplePageRankVertex extend
@Override
public void writeVertex(
- BasicVertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
+ Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
throws IOException, InterruptedException {
getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,13 @@
package org.apache.giraph.examples;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import java.util.Iterator;
-
/**
* Demonstrates the basic Pregel shortest paths implementation.
*/
@@ -50,37 +49,33 @@ public class SimpleShortestPathsVertex e
* @return True if the source id
*/
private boolean isSource() {
- return getVertexId().get() ==
+ return getId().get() ==
getContext().getConfiguration().getLong(SOURCE_ID,
SOURCE_ID_DEFAULT);
}
@Override
- public void compute(Iterator<DoubleWritable> msgIterator) {
+ public void compute(Iterable<DoubleWritable> messages) {
if (getSuperstep() == 0) {
- setVertexValue(new DoubleWritable(Double.MAX_VALUE));
+ setValue(new DoubleWritable(Double.MAX_VALUE));
}
double minDist = isSource() ? 0d : Double.MAX_VALUE;
- while (msgIterator.hasNext()) {
- minDist = Math.min(minDist, msgIterator.next().get());
+ for (DoubleWritable message : messages) {
+ minDist = Math.min(minDist, message.get());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
- " vertex value = " + getVertexValue());
+ LOG.debug("Vertex " + getId() + " got minDist = " + minDist +
+ " vertex value = " + getValue());
}
- if (minDist < getVertexValue().get()) {
- setVertexValue(new DoubleWritable(minDist));
- for (Iterator<LongWritable> edges = getOutEdgesIterator();
- edges.hasNext();) {
- LongWritable targetVertexId = edges.next();
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ if (minDist < getValue().get()) {
+ setValue(new DoubleWritable(minDist));
+ for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ double distance = minDist + edge.getValue().get();
if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getVertexId() + " sent to " +
- targetVertexId + " = " +
- (minDist + edgeValue.get()));
+ LOG.debug("Vertex " + getId() + " sent to " +
+ edge.getTargetVertexId() + " = " + distance);
}
- sendMsg(targetVertexId,
- new DoubleWritable(minDist + edgeValue.get()));
+ sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
}
}
voteToHalt();
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Jul 24 23:37:42 2012
@@ -18,10 +18,9 @@
package org.apache.giraph.examples;
-import com.google.common.collect.Maps;
-import org.apache.giraph.graph.BasicVertex;
import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.lib.TextVertexOutputFormat;
@@ -35,8 +34,9 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -46,7 +46,7 @@ import java.util.Map;
public class SimpleSuperstepVertex extends
EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
@Override
- public void compute(Iterator<IntWritable> msgIterator) {
+ public void compute(Iterable<IntWritable> messages) {
if (getSuperstep() > 3) {
voteToHalt();
}
@@ -74,11 +74,10 @@ public class SimpleSuperstepVertex exten
}
@Override
- public BasicVertex<LongWritable, IntWritable, FloatWritable,
- IntWritable> getCurrentVertex()
+ public Vertex<LongWritable, IntWritable, FloatWritable,
+ IntWritable> getCurrentVertex()
throws IOException, InterruptedException {
- BasicVertex<LongWritable, IntWritable,
- FloatWritable, IntWritable> vertex =
+ Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
BspUtils.<LongWritable, IntWritable, FloatWritable,
IntWritable>createVertex(configuration);
long tmpId = reverseIdOrder ?
@@ -89,18 +88,18 @@ public class SimpleSuperstepVertex exten
IntWritable vertexValue =
new IntWritable((int) (vertexId.get() * 10));
Map<LongWritable, FloatWritable> edgeMap = Maps.newHashMap();
- long destVertexId =
+ long targetVertexId =
(vertexId.get() + 1) %
(inputSplit.getNumSplits() * totalRecords);
float edgeValue = vertexId.get() * 100f;
- edgeMap.put(new LongWritable(destVertexId),
+ edgeMap.put(new LongWritable(targetVertexId),
new FloatWritable(edgeValue));
vertex.initialize(vertexId, vertexValue, edgeMap, null);
++recordsRead;
if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getVertexId().get() +
- ", vertexValue=" + vertex.getVertexValue() +
- ", destinationId=" + destVertexId +
+ LOG.info("next: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId +
", edgeValue=" + edgeValue);
}
return vertex;
@@ -137,11 +136,11 @@ public class SimpleSuperstepVertex exten
}
@Override
- public void writeVertex(BasicVertex<LongWritable, IntWritable,
- FloatWritable, ?> vertex) throws IOException, InterruptedException {
+ public void writeVertex(Vertex<LongWritable, IntWritable,
+ FloatWritable, ?> vertex) throws IOException, InterruptedException {
getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Tue Jul 24 23:37:42 2012
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.lib.TextVertexOutputFormat;
@@ -51,11 +51,11 @@ public class SimpleTextVertexOutputForma
@Override
public void writeVertex(
- BasicVertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+ Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
throws IOException, InterruptedException {
getRecordWriter().write(
- new Text(vertex.getVertexId().toString()),
- new Text(vertex.getVertexValue().toString()));
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java Tue Jul 24 23:37:42 2012
@@ -18,17 +18,17 @@
package org.apache.giraph.examples;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.Map;
-import java.util.TreeMap;
import java.util.Set;
-import java.util.HashSet;
+import java.util.TreeMap;
/**
* Demonstrates triangle closing in simple,
@@ -71,22 +71,20 @@ public class SimpleTriangleClosingVertex
private Set<Integer> recvSet = new HashSet<Integer>();
@Override
- public void compute(Iterator<IntWritable> msgIterator) {
+ public void compute(Iterable<IntWritable> messages) {
if (getSuperstep() == 0) {
// obtain list of all out-edges from THIS vertex
- Iterator<IntWritable> iterator = getOutEdgesIterator();
- while (iterator.hasNext()) {
- sendMsgToAllEdges(iterator.next());
+ for (Edge<IntWritable, NullWritable> edge : getEdges()) {
+ sendMessageToAllEdges(edge.getTargetVertexId());
}
} else {
- while (msgIterator.hasNext()) {
- IntWritable iw = msgIterator.next();
- int inId = iw.get();
+ for (IntWritable message : messages) {
+ int inId = message.get();
if (recvSet.contains(inId)) {
- int current = closeMap.get(iw) == null ? 0 : inId;
- closeMap.put(iw, current + 1);
+ int current = closeMap.get(message) == null ? 0 : inId;
+ closeMap.put(message, current + 1);
}
- if (inId != getVertexId().get()) {
+ if (inId != getId().get()) {
recvSet.add(inId);
}
}
@@ -97,7 +95,7 @@ public class SimpleTriangleClosingVertex
}
IntArrayWritable result = new IntArrayWritable();
result.set(temp);
- setVertexValue(result);
+ setValue(result);
}
voteToHalt();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,10 @@
package org.apache.giraph.examples;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.giraph.examples.SimpleSuperstepVertex.
- SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.graph.GiraphJob;
+ SimpleSuperstepVertexInputFormat;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +34,9 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
/**
* Fully runnable example of how to
* emit worker data to HDFS during a graph
@@ -52,15 +51,14 @@ public class SimpleVertexWithWorkerConte
private static final int TESTLENGTH = 30;
@Override
- public void compute(Iterator<DoubleWritable> msgIterator)
- throws IOException {
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
long superstep = getSuperstep();
if (superstep < TESTLENGTH) {
EmitterWorkerContext emitter =
(EmitterWorkerContext) getWorkerContext();
- emitter.emit("vertexId=" + getVertexId() +
+ emitter.emit("vertexId=" + getId() +
" superstep=" + superstep + "\n");
} else {
voteToHalt();
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Tue Jul 24 23:37:42 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.examples;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.io.FloatWritable;
@@ -30,7 +31,6 @@ import org.apache.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
/**
* An example that simply uses its id, value, and edges to compute new data
@@ -142,7 +142,7 @@ public class VerifyMessage {
}
@Override
- public void compute(Iterator<VerifiableMessage> msgIterator) {
+ public void compute(Iterable<VerifiableMessage> messages) {
LongSumAggregator sumAggregator = (LongSumAggregator)
getAggregator(LongSumAggregator.class.getName());
if (getSuperstep() > SUPERSTEPS) {
@@ -152,61 +152,57 @@ public class VerifyMessage {
if (LOG.isDebugEnabled()) {
LOG.debug("compute: " + sumAggregator);
}
- sumAggregator.aggregate(getVertexId().get());
+ sumAggregator.aggregate(getId().get());
if (LOG.isDebugEnabled()) {
LOG.debug("compute: sum = " +
sumAggregator.getAggregatedValue().get() +
- " for vertex " + getVertexId());
+ " for vertex " + getId());
}
float msgValue = 0.0f;
- while (msgIterator.hasNext()) {
- VerifiableMessage msg = msgIterator.next();
- msgValue += msg.value;
+ for (VerifiableMessage message : messages) {
+ msgValue += message.value;
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: got msg = " + msg +
- " for vertex id " + getVertexId() +
- ", vertex value " + getVertexValue() +
+ LOG.debug("compute: got msg = " + message +
+ " for vertex id " + getId() +
+ ", vertex value " + getValue() +
" on superstep " + getSuperstep());
}
- if (msg.superstep != getSuperstep() - 1) {
+ if (message.superstep != getSuperstep() - 1) {
throw new IllegalStateException(
"compute: Impossible to not get a messsage from " +
"the previous superstep, current superstep = " +
getSuperstep());
}
- if ((msg.sourceVertexId != getVertexId().get() - 1) &&
- (getVertexId().get() != 0)) {
+ if ((message.sourceVertexId != getId().get() - 1) &&
+ (getId().get() != 0)) {
throw new IllegalStateException(
"compute: Impossible that this message didn't come " +
"from the previous vertex and came from " +
- msg.sourceVertexId);
+ message.sourceVertexId);
}
}
- int vertexValue = getVertexValue().get();
- setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+ int vertexValue = getValue().get();
+ setValue(new IntWritable(vertexValue + (int) msgValue));
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getVertexId() +
- " has value " + getVertexValue() +
+ LOG.debug("compute: vertex " + getId() +
+ " has value " + getValue() +
" on superstep " + getSuperstep());
}
- for (Iterator<LongWritable> edges = getOutEdgesIterator();
- edges.hasNext();) {
- LongWritable targetVertexId = edges.next();
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ FloatWritable newEdgeValue = new FloatWritable(
+ edge.getValue().get() + (float) vertexValue);
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getVertexId() +
- " sending edgeValue " + edgeValue +
+ LOG.debug("compute: vertex " + getId() +
+ " sending edgeValue " + edge.getValue() +
" vertexValue " + vertexValue +
- " total " +
- (edgeValue.get() + (float) vertexValue) +
- " to vertex " + targetVertexId +
+ " total " + newEdgeValue +
+ " to vertex " + edge.getTargetVertexId() +
" on superstep " + getSuperstep());
}
- edgeValue.set(edgeValue.get() + (float) vertexValue);
- addEdge(targetVertexId, edgeValue);
- sendMsg(targetVertexId,
+ addEdge(edge.getTargetVertexId(), newEdgeValue);
+ sendMessage(edge.getTargetVertexId(),
new VerifiableMessage(
- getSuperstep(), getVertexId().get(), edgeValue.get()));
+ getSuperstep(), getId().get(), newEdgeValue.get()));
}
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Tue Jul 24 23:37:42 2012
@@ -18,7 +18,7 @@
package org.apache.giraph.examples;
-import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexWriter;
import org.apache.giraph.lib.TextVertexOutputFormat;
import org.apache.hadoop.io.IntWritable;
@@ -63,13 +63,13 @@ public class VertexWithComponentTextOutp
}
@Override
- public void writeVertex(BasicVertex<IntWritable, IntWritable,
- NullWritable, ?> vertex) throws IOException,
+ public void writeVertex(Vertex<IntWritable, IntWritable,
+ NullWritable, ?> vertex) throws IOException,
InterruptedException {
StringBuilder output = new StringBuilder();
- output.append(vertex.getVertexId().get());
+ output.append(vertex.getId().get());
output.append('\t');
- output.append(vertex.getVertexValue().get());
+ output.append(vertex.getValue().get());
getRecordWriter().write(new Text(output.toString()), null);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Tue Jul 24 23:37:42 2012
@@ -39,7 +39,7 @@ public interface BasicVertexResolver<I e
* excluding the normal case (a vertex already exists and has zero or more
* messages sent it to).
*
- * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
+ * @param vertexId Vertex id (can be used for {@link Vertex}'s
* initialize())
* @param vertex Original vertex or null if none
* @param vertexChanges Changes that happened to this vertex or null if none
@@ -47,8 +47,8 @@ public interface BasicVertexResolver<I e
* @return Vertex to be returned, if null, and a vertex currently exists
* it will be removed
*/
- BasicVertex<I, V, E, M> resolve(I vertexId,
- BasicVertex<I, V, E, M> vertex,
+ Vertex<I, V, E, M> resolve(I vertexId,
+ Vertex<I, V, E, M> vertex,
VertexChanges<I, V, E, M> vertexChanges,
Iterable<M> messages);
@@ -57,5 +57,5 @@ public interface BasicVertexResolver<I e
*
* @return Newly instantiated vertex.
*/
- BasicVertex<I, V, E, M> instantiateVertex();
+ Vertex<I, V, E, M> instantiateVertex();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Jul 24 23:37:42 2012
@@ -18,12 +18,16 @@
package org.apache.giraph.graph;
-import net.iharder.Base64;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.graph.GraphMapper.MapFunctions;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -48,6 +52,8 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import net.iharder.Base64;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -65,12 +71,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import org.apache.giraph.graph.partition.MasterGraphPartitioner;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.giraph.graph.partition.PartitionUtils;
-import org.apache.giraph.utils.WritableUtils;
-
/**
* ZooKeeper-based implementation of {@link CentralizedService}.
*
@@ -1556,8 +1556,8 @@ public class BspServiceMaster<I extends
// The master.compute() should run logically before the workers, so
// increase the superstep counter it uses by one
graphState.setSuperstep(superstep + 1);
- graphState.setNumVertices(vertexCounter.getValue());
- graphState.setNumEdges(edgeCounter.getValue());
+ graphState.setTotalNumVertices(vertexCounter.getValue());
+ graphState.setTotalNumEdges(edgeCounter.getValue());
graphState.setContext(getContext());
graphState.setGraphMapper(getGraphMapper());
masterCompute.setGraphState(graphState);
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Jul 24 23:37:42 2012
@@ -18,14 +18,12 @@
package org.apache.giraph.graph;
-import net.iharder.Base64;
-
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.NettyWorkerClientServer;
import org.apache.giraph.comm.RPCCommunications;
-import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
@@ -54,6 +52,8 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
+import net.iharder.Base64;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -319,7 +319,7 @@ public class BspServiceWorker<I extends
for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
inputSplitCache.entrySet()) {
if (!entry.getValue().getVertices().isEmpty()) {
- commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
+ commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
entry.getValue());
entry.getValue().getVertices().clear();
}
@@ -447,20 +447,20 @@ public class BspServiceWorker<I extends
long vertexCount = 0;
long edgeCount = 0;
while (vertexReader.nextVertex()) {
- BasicVertex<I, V, E, M> readerVertex =
+ Vertex<I, V, E, M> readerVertex =
vertexReader.getCurrentVertex();
- if (readerVertex.getVertexId() == null) {
+ if (readerVertex.getId() == null) {
throw new IllegalArgumentException(
"loadVertices: Vertex reader returned a vertex " +
"without an id! - " + readerVertex);
}
- if (readerVertex.getVertexValue() == null) {
- readerVertex.setVertexValue(
+ if (readerVertex.getValue() == null) {
+ readerVertex.setValue(
BspUtils.<V>createVertexValue(getConfiguration()));
}
PartitionOwner partitionOwner =
workerGraphPartitioner.getPartitionOwner(
- readerVertex.getVertexId());
+ readerVertex.getId());
Partition<I, V, E, M> partition =
inputSplitCache.get(partitionOwner);
if (partition == null) {
@@ -469,23 +469,23 @@ public class BspServiceWorker<I extends
partitionOwner.getPartitionId());
inputSplitCache.put(partitionOwner, partition);
}
- BasicVertex<I, V, E, M> oldVertex =
+ Vertex<I, V, E, M> oldVertex =
partition.putVertex(readerVertex);
if (oldVertex != null) {
LOG.warn("readVertices: Replacing vertex " + oldVertex +
" with " + readerVertex);
}
if (partition.getVertices().size() >= maxVerticesPerPartition) {
- commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
+ commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
partition);
partition.getVertices().clear();
}
++vertexCount;
- edgeCount += readerVertex.getNumOutEdges();
+ edgeCount += readerVertex.getNumEdges();
getContext().progress();
++totalVerticesLoaded;
- totalEdgesLoaded += readerVertex.getNumOutEdges();
+ totalEdgesLoaded += readerVertex.getNumEdges();
// Update status every half a million vertices
if ((totalVerticesLoaded % 500000) == 0) {
String status = "readVerticesFromInputSplit: Loaded " +
@@ -519,9 +519,9 @@ public class BspServiceWorker<I extends
}
@Override
- public void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
- Iterable<M> messageIterator) {
- vertex.putMessages(messageIterator);
+ public void assignMessagesToVertex(Vertex<I, V, E, M> vertex,
+ Iterable<M> messages) {
+ vertex.putMessages(messages);
}
@Override
@@ -674,7 +674,7 @@ public class BspServiceWorker<I extends
new ArrayList<PartitionStats>();
for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
PartitionStats partitionStats =
- new PartitionStats(partition.getPartitionId(),
+ new PartitionStats(partition.getId(),
partition.getVertices().size(),
0,
partition.getEdgeCount());
@@ -1056,8 +1056,8 @@ public class BspServiceWorker<I extends
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
getGraphMapper().getGraphState().
- setNumEdges(globalStats.getEdgeCount()).
- setNumVertices(globalStats.getVertexCount());
+ setTotalNumEdges(globalStats.getEdgeCount()).
+ setTotalNumVertices(globalStats.getVertexCount());
return globalStats.getHaltComputation();
}
@@ -1080,7 +1080,7 @@ public class BspServiceWorker<I extends
vertexOutputFormat.createVertexWriter(getContext());
vertexWriter.initialize(getContext());
for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
- for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+ for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
vertexWriter.writeVertex(vertex);
}
}
@@ -1185,7 +1185,7 @@ public class BspServiceWorker<I extends
// <index 0 start pos><partition id>
// <index 1 start pos><partition id>
metadataOutput.writeLong(startPos);
- metadataOutput.writeInt(partition.getPartitionId());
+ metadataOutput.writeInt(partition.getId());
if (LOG.isDebugEnabled()) {
LOG.debug("storeCheckpoint: Vertex file starting " +
"offset = " + startPos + ", length = " +
@@ -1314,8 +1314,8 @@ public class BspServiceWorker<I extends
partitionId);
}
getGraphMapper().getGraphState().getWorkerCommunications().
- sendPartitionReq(workerPartitionList.getKey(),
- partition);
+ sendPartitionRequest(workerPartitionList.getKey(),
+ partition);
getPartitionMap().remove(partitionId);
}
}
@@ -1426,10 +1426,10 @@ public class BspServiceWorker<I extends
*/
private void movePartitionsToWorker(
WorkerServer<I, V, E, M> commService) {
- Map<Integer, Collection<BasicVertex<I, V, E, M>>> inPartitionVertexMap =
+ Map<Integer, Collection<Vertex<I, V, E, M>>> inPartitionVertexMap =
commService.getInPartitionVertexMap();
synchronized (inPartitionVertexMap) {
- for (Entry<Integer, Collection<BasicVertex<I, V, E, M>>> entry :
+ for (Entry<Integer, Collection<Vertex<I, V, E, M>>> entry :
inPartitionVertexMap.entrySet()) {
if (getPartitionMap().containsKey(entry.getKey())) {
throw new IllegalStateException(
@@ -1443,7 +1443,7 @@ public class BspServiceWorker<I extends
new Partition<I, V, E, M>(getConfiguration(),
entry.getKey());
synchronized (entry.getValue()) {
- for (BasicVertex<I, V, E, M> vertex : entry.getValue()) {
+ for (Vertex<I, V, E, M> vertex : entry.getValue()) {
if (tmpPartition.putVertex(vertex) != null) {
throw new IllegalStateException(
"moveVerticesToWorker: Vertex " + vertex +
@@ -1455,7 +1455,7 @@ public class BspServiceWorker<I extends
entry.getValue().size() +
" vertices for partition id " + entry.getKey());
}
- getPartitionMap().put(tmpPartition.getPartitionId(),
+ getPartitionMap().put(tmpPartition.getId(),
tmpPartition);
entry.getValue().clear();
}
@@ -1527,27 +1527,27 @@ public class BspServiceWorker<I extends
}
@Override
- public PartitionOwner getVertexPartitionOwner(I vertexIndex) {
- return workerGraphPartitioner.getPartitionOwner(vertexIndex);
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return workerGraphPartitioner.getPartitionOwner(vertexId);
}
/**
* Get the partition for a vertex index.
*
- * @param vertexIndex Vertex index to search for the partition.
+ * @param vertexId Vertex index to search for the partition.
* @return Partition that owns this vertex.
*/
- public Partition<I, V, E, M> getPartition(I vertexIndex) {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ public Partition<I, V, E, M> getPartition(I vertexId) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
return workerPartitionMap.get(partitionOwner.getPartitionId());
}
@Override
- public BasicVertex<I, V, E, M> getVertex(I vertexIndex) {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
+ public Vertex<I, V, E, M> getVertex(I vertexId) {
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) {
return workerPartitionMap.get(
- partitionOwner.getPartitionId()).getVertex(vertexIndex);
+ partitionOwner.getPartitionId()).getVertex(vertexId);
} else {
return null;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Jul 24 23:37:42 2012
@@ -356,7 +356,7 @@ public class BspUtils {
}
/**
- * Get the user's subclassed {@link BasicVertex}
+ * Get the user's subclassed {@link Vertex}
*
* @param <I> Vertex id
* @param <V> Vertex data
@@ -368,11 +368,11 @@ public class BspUtils {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable>
- Class<? extends BasicVertex<I, V, E, M>> getVertexClass(Configuration conf) {
- return (Class<? extends BasicVertex<I, V, E, M>>)
+ Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
+ return (Class<? extends Vertex<I, V, E, M>>)
conf.getClass(GiraphJob.VERTEX_CLASS,
null,
- BasicVertex.class);
+ Vertex.class);
}
/**
@@ -387,10 +387,10 @@ public class BspUtils {
*/
@SuppressWarnings("rawtypes")
public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> BasicVertex<I, V, E, M>
+ E extends Writable, M extends Writable> Vertex<I, V, E, M>
createVertex(Configuration conf) {
- Class<? extends BasicVertex<I, V, E, M>> vertexClass = getVertexClass(conf);
- BasicVertex<I, V, E, M> vertex =
+ Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+ Vertex<I, V, E, M> vertex =
ReflectionUtils.newInstance(vertexClass, conf);
return vertex;
}
@@ -404,8 +404,8 @@ public class BspUtils {
*/
@SuppressWarnings("unchecked")
public static <I extends Writable> Class<I>
- getVertexIndexClass(Configuration conf) {
- return (Class<I>) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS,
+ getVertexIdClass(Configuration conf) {
+ return (Class<I>) conf.getClass(GiraphJob.VERTEX_ID_CLASS,
WritableComparable.class);
}
@@ -418,16 +418,16 @@ public class BspUtils {
*/
@SuppressWarnings("rawtypes")
public static <I extends WritableComparable>
- I createVertexIndex(Configuration conf) {
- Class<I> vertexClass = getVertexIndexClass(conf);
+ I createVertexId(Configuration conf) {
+ Class<I> vertexIdClass = getVertexIdClass(conf);
try {
- return vertexClass.newInstance();
+ return vertexIdClass.newInstance();
} catch (InstantiationException e) {
throw new IllegalArgumentException(
- "createVertexIndex: Failed to instantiate", e);
+ "createVertexId: Failed to instantiate", e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException(
- "createVertexIndex: Illegally accessed", e);
+ "createVertexId: Illegally accessed", e);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Tue Jul 24 23:37:42 2012
@@ -18,17 +18,11 @@
package org.apache.giraph.graph;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
/**
- * A complete edge, the destination vertex and the edge value. Can only be one
+ * A complete edge, the target vertex and the edge value. Can only be one
* edge with a destination vertex id per edge map.
*
* @param <I> Vertex index
@@ -36,13 +30,11 @@ import java.io.IOException;
*/
@SuppressWarnings("rawtypes")
public class Edge<I extends WritableComparable, E extends Writable>
- implements WritableComparable<Edge<I, E>>, Configurable {
- /** Destination vertex id */
- private I destVertexId = null;
+ implements Comparable<Edge<I, E>> {
+ /** Target vertex id */
+ private I targetVertexId = null;
/** Edge value */
- private E edgeValue = null;
- /** Configuration - Used to instantiate classes */
- private Configuration conf = null;
+ private E value = null;
/**
* Constructor for reflection
@@ -52,21 +44,21 @@ public class Edge<I extends WritableComp
/**
* Create the edge with final values
*
- * @param destVertexId Desination vertex id.
- * @param edgeValue Value of the edge.
+ * @param targetVertexId Desination vertex id.
+ * @param value Value of the edge.
*/
- public Edge(I destVertexId, E edgeValue) {
- this.destVertexId = destVertexId;
- this.edgeValue = edgeValue;
+ public Edge(I targetVertexId, E value) {
+ this.targetVertexId = targetVertexId;
+ this.value = value;
}
/**
- * Get the destination vertex index of this edge
+ * Get the target vertex index of this edge
*
- * @return Destination vertex index of this edge
+ * @return Target vertex index of this edge
*/
- public I getDestVertexId() {
- return destVertexId;
+ public I getTargetVertexId() {
+ return targetVertexId;
}
/**
@@ -74,70 +66,38 @@ public class Edge<I extends WritableComp
*
* @return Edge value of this edge
*/
- public E getEdgeValue() {
- return edgeValue;
+ public E getValue() {
+ return value;
}
/**
* Set the destination vertex index of this edge.
*
- * @param destVertexId new destination vertex
+ * @param targetVertexId new destination vertex
*/
- public void setDestVertexId(I destVertexId) {
- this.destVertexId = destVertexId;
+ public void setTargetVertexId(I targetVertexId) {
+ this.targetVertexId = targetVertexId;
}
/**
* Set the value for this edge.
*
- * @param edgeValue new edge value
+ * @param value new edge value
*/
- public void setEdgeValue(E edgeValue) {
- this.edgeValue = edgeValue;
+ public void setValue(E value) {
+ this.value = value;
}
@Override
public String toString() {
- return "(DestVertexIndex = " + destVertexId +
- ", edgeValue = " + edgeValue + ")";
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- destVertexId = BspUtils.<I>createVertexIndex(getConf());
- destVertexId.readFields(input);
- edgeValue = BspUtils.<E>createEdgeValue(getConf());
- edgeValue.readFields(input);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- if (destVertexId == null) {
- throw new IllegalStateException(
- "write: Null destination vertex index");
- }
- if (edgeValue == null) {
- throw new IllegalStateException(
- "write: Null edge value");
- }
- destVertexId.write(output);
- edgeValue.write(output);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
+ return "(TargetVertexId = " + targetVertexId + ", " +
+ "value = " + value + ")";
}
@SuppressWarnings("unchecked")
@Override
public int compareTo(Edge<I, E> edge) {
- return destVertexId.compareTo(edge.getDestVertexId());
+ return targetVertexId.compareTo(edge.getTargetVertexId());
}
@Override
@@ -151,12 +111,11 @@ public class Edge<I extends WritableComp
Edge edge = (Edge) o;
- if (destVertexId != null ? !destVertexId.equals(edge.destVertexId) :
- edge.destVertexId != null) {
+ if (targetVertexId != null ? !targetVertexId.equals(edge.targetVertexId) :
+ edge.targetVertexId != null) {
return false;
}
- if (edgeValue != null ?
- !edgeValue.equals(edge.edgeValue) : edge.edgeValue != null) {
+ if (value != null ? !value.equals(edge.value) : edge.value != null) {
return false;
}
@@ -165,8 +124,8 @@ public class Edge<I extends WritableComp
@Override
public int hashCode() {
- int result = destVertexId != null ? destVertexId.hashCode() : 0;
- result = 31 * result + (edgeValue != null ? edgeValue.hashCode() : 0);
+ int result = targetVertexId != null ? targetVertexId.hashCode() : 0;
+ result = 31 * result + (value != null ? value.hashCode() : 0);
return result;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Tue Jul 24 23:37:42 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.graph;
-import org.apache.giraph.utils.ComparisonUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -29,9 +28,6 @@ import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,7 +35,7 @@ import java.util.Map;
/**
* User applications can subclass {@link EdgeListVertex}, which stores
* the outbound edges in an ArrayList (less memory as the cost of expensive
- * sorting and random-access lookup). Good for static graphs.
+ * random-access lookup). Good for static graphs.
*
* @param <I> Vertex index value
* @param <V> Vertex value
@@ -52,268 +48,134 @@ public abstract class EdgeListVertex<I e
extends MutableVertex<I, V, E, M> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** List of the dest edge indices */
- private List<I> destEdgeIndexList;
- /** List of the dest edge values */
- private List<E> destEdgeValueList;
+ /** List of edges */
+ private List<Edge<I, E>> edgeList = Lists.newArrayList();
/** List of incoming messages from the previous superstep */
- private List<M> msgList;
+ private List<M> messageList = Lists.newArrayList();
@Override
- public void initialize(I vertexId, V vertexValue,
- Map<I, E> edges,
- Iterable<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- if (edges != null && !edges.isEmpty()) {
- destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size());
- destEdgeValueList = Lists.newArrayListWithCapacity(edges.size());
- List<I> sortedIndexList = new ArrayList<I>(edges.keySet());
- Collections.sort(sortedIndexList, new VertexIdComparator());
- for (I index : sortedIndexList) {
- destEdgeIndexList.add(index);
- destEdgeValueList.add(edges.get(index));
+ public void initialize(I id, V value, Map<I, E> edges, Iterable<M> messages) {
+ super.initialize(id, value);
+ if (edges != null) {
+ for (Map.Entry<I, E> edge : edges.entrySet()) {
+ edgeList.add(new Edge<I, E>(edge.getKey(), edge.getValue()));
}
- sortedIndexList.clear();
- } else {
- destEdgeIndexList = Lists.newArrayListWithCapacity(0);
- destEdgeValueList = Lists.newArrayListWithCapacity(0);
}
if (messages != null) {
- msgList = Lists.newArrayListWithCapacity(Iterables.size(messages));
- Iterables.<M>addAll(msgList, messages);
- } else {
- msgList = Lists.newArrayListWithCapacity(0);
+ Iterables.<M>addAll(messageList, messages);
}
}
@Override
- public int hashCode() {
- return vertexId.hashCode() * 37 + vertexValue.hashCode();
+ public Iterable<Edge<I, E>> getEdges() {
+ return edgeList;
}
@Override
- public boolean equals(Object other) {
- if (other instanceof EdgeListVertex) {
- @SuppressWarnings("unchecked")
- EdgeListVertex<I, V, E, M> otherVertex = (EdgeListVertex) other;
- if (!getVertexId().equals(otherVertex.getVertexId())) {
+ public final boolean addEdge(I targetVertexId, E value) {
+ for (Edge<I, E> edge : getEdges()) {
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ LOG.warn("addEdge: Vertex=" + getId() +
+ ": already added an edge value for target vertex id " +
+ targetVertexId);
return false;
}
- if (!getVertexValue().equals(otherVertex.getVertexValue())) {
- return false;
- }
- if (!ComparisonUtils.equal(getMessages(),
- otherVertex.getMessages())) {
- return false;
- }
- return ComparisonUtils.equal(getOutEdgesIterator(),
- otherVertex.getOutEdgesIterator());
- }
- return false;
- }
-
- /**
- * Comparator for the vertex id
- */
- private class VertexIdComparator implements Comparator<I> {
- @SuppressWarnings("unchecked")
- @Override
- public int compare(I index1, I index2) {
- return index1.compareTo(index2);
}
+ edgeList.add(new Edge<I, E>(targetVertexId, value));
+ return true;
}
@Override
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- destEdgeIndexList.add(-1 * (pos + 1), targetVertexId);
- destEdgeValueList.add(-1 * (pos + 1), edgeValue);
- return true;
- } else {
- LOG.warn("addEdge: Vertex=" + vertexId +
- ": already added an edge value for dest vertex id " +
- targetVertexId);
- return false;
- }
- }
-
- @Override
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- @Override
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- }
-
- @Override
- public final I getVertexId() {
- return vertexId;
- }
-
- @Override
- public final V getVertexValue() {
- return vertexValue;
- }
-
- @Override
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- }
-
- @Override
- public E getEdgeValue(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return null;
- } else {
- return destEdgeValueList.get(pos);
- }
+ public int getNumEdges() {
+ return edgeList.size();
}
@Override
- public boolean hasEdge(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return false;
+ public E removeEdge(I targetVertexId) {
+ for (Iterator<Edge<I, E>> edges = edgeList.iterator(); edges.hasNext();) {
+ Edge<I, E> edge = edges.next();
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ E edgeValue = edge.getValue();
+ edges.remove();
+ return edgeValue;
+ }
}
- return true;
- }
-
- /**
- * Get an iterator to the edges on this vertex.
- *
- * @return A <em>sorted</em> iterator, as defined by the sort-order
- * of the vertex ids
- */
- @Override
- public Iterator<I> getOutEdgesIterator() {
- return destEdgeIndexList.iterator();
+ return null;
}
@Override
- public int getNumOutEdges() {
- return destEdgeIndexList.size();
+ void putMessages(Iterable<M> messages) {
+ messageList.clear();
+ Iterables.addAll(messageList, messages);
}
@Override
- public E removeEdge(I targetVertexId) {
- int pos = Collections.binarySearch(destEdgeIndexList,
- targetVertexId,
- new VertexIdComparator());
- if (pos < 0) {
- return null;
- } else {
- destEdgeIndexList.remove(pos);
- return destEdgeValueList.remove(pos);
- }
+ public Iterable<M> getMessages() {
+ return Iterables.unmodifiableIterable(messageList);
}
@Override
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (I index : destEdgeIndexList) {
- sendMsg(index, msg);
- }
+ public int getNumMessages() {
+ return messageList.size();
}
@Override
public final void readFields(DataInput in) throws IOException {
- vertexId = BspUtils.<I>createVertexIndex(getConf());
+ I vertexId = BspUtils.<I>createVertexId(getConf());
vertexId.readFields(in);
- boolean hasVertexValue = in.readBoolean();
- if (hasVertexValue) {
- vertexValue = BspUtils.<V>createVertexValue(getConf());
- vertexValue.readFields(in);
- }
- int edgeListCount = in.readInt();
- destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount);
- destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount);
- for (int i = 0; i < edgeListCount; ++i) {
- I destVertexId = BspUtils.<I>createVertexIndex(getConf());
+ V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ vertexValue.readFields(in);
+ super.initialize(vertexId, vertexValue);
+
+ int numEdges = in.readInt();
+ edgeList = Lists.newArrayListWithCapacity(numEdges);
+ for (int i = 0; i < numEdges; ++i) {
+ I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ targetVertexId.readFields(in);
E edgeValue = BspUtils.<E>createEdgeValue(getConf());
- destVertexId.readFields(in);
edgeValue.readFields(in);
- destEdgeIndexList.add(destVertexId);
- destEdgeValueList.add(edgeValue);
- }
- int msgListSize = in.readInt();
- msgList = Lists.newArrayListWithCapacity(msgListSize);
- for (int i = 0; i < msgListSize; ++i) {
- M msg = BspUtils.<M>createMessageValue(getConf());
- msg.readFields(in);
- msgList.add(msg);
+ edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
}
- halt = in.readBoolean();
- }
- @Override
- public final void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- out.writeInt(destEdgeIndexList.size());
- for (int i = 0; i < destEdgeIndexList.size(); ++i) {
- destEdgeIndexList.get(i).write(out);
- destEdgeValueList.get(i).write(out);
+ int numMessages = in.readInt();
+ messageList = Lists.newArrayListWithCapacity(numMessages);
+ for (int i = 0; i < numMessages; ++i) {
+ M message = BspUtils.<M>createMessageValue(getConf());
+ message.readFields(in);
+ messageList.add(message);
}
- out.writeInt(msgList.size());
- for (M msg : msgList) {
- msg.write(out);
+
+ boolean halt = in.readBoolean();
+ if (halt) {
+ voteToHalt();
+ } else {
+ wakeUp();
}
- out.writeBoolean(halt);
}
@Override
- void putMessages(Iterable<M> messages) {
- msgList.clear();
- for (M message : messages) {
- msgList.add(message);
+ public final void write(DataOutput out) throws IOException {
+ getId().write(out);
+ getValue().write(out);
+
+ out.writeInt(edgeList.size());
+ for (Edge<I, E> edge : edgeList) {
+ edge.getTargetVertexId().write(out);
+ edge.getValue().write(out);
}
- }
- @Override
- public Iterable<M> getMessages() {
- return Iterables.unmodifiableIterable(msgList);
- }
+ out.writeInt(messageList.size());
+ for (M message : messageList) {
+ message.write(out);
+ }
- @Override
- public int getNumMessages() {
- return msgList.size();
+ out.writeBoolean(isHalted());
}
@Override
void releaseResources() {
// Hint to GC to free the messages
- msgList.clear();
- }
-
- @Override
- public String toString() {
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
- ",#edges=" + getNumOutEdges() + ")";
+ messageList.clear();
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1365352&r1=1365351&r2=1365352&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Jul 24 23:37:42 2012
@@ -61,7 +61,7 @@ public class GiraphJob {
"giraph.graphPartitionerFactoryClass";
/** Vertex index class */
- public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass";
+ public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
/** Vertex value class */
public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
/** Edge value class */
@@ -451,7 +451,7 @@ public class GiraphJob {
* @param vertexClass Runs vertex computation
*/
public final void setVertexClass(Class<?> vertexClass) {
- getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class);
+ getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
}
/**