You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/07 06:37:41 UTC
[6/8] GIRAPH-528: Decouple vertex implementation from edge storage
(apresta)
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
new file mode 100644
index 0000000..36381a7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/StrictRandomAccessVertexEdges.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for {@link VertexEdges} implementations that provide efficient
+ * random access to the edges given the target vertex id.
+ * This version is for strict graphs (i.e. assumes no parallel edges).
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface StrictRandomAccessVertexEdges<I extends WritableComparable,
+ E extends Writable> extends VertexEdges<I, E> {
+ /**
+ * Return the edge value for the given target vertex id (or null if there
+ * is no edge pointing to it).
+ *
+ * @param targetVertexId Target vertex id
+ * @return Edge value
+ */
+ E getEdgeValue(I targetVertexId);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
new file mode 100644
index 0000000..bb885b7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/VertexEdges.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.edge;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for data structures that store out-edges for a vertex.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public interface VertexEdges<I extends WritableComparable, E extends Writable>
+ extends Iterable<Edge<I, E>>, Writable {
+ /**
+ * Initialize the data structure and set the edges from an iterable.
+ * This method (or one of the two alternatives) must be called
+ * after instantiation, unless readFields() is called.
+ * Note: whether parallel edges are allowed or not depends on the
+ * implementation.
+ *
+ * @param edges Iterable of edges
+ */
+ void initialize(Iterable<Edge<I, E>> edges);
+
+ /**
+ * Initialize the data structure with the specified initial capacity.
+ * This method (or one of the two alternatives) must be called
+ * after instantiation, unless readFields() is called.
+ *
+ * @param capacity Initial capacity
+ */
+ void initialize(int capacity);
+
+ /**
+ * Initialize the data structure with the default initial capacity.
+ * This method (or one of the two alternatives) must be called
+ * after instantiation, unless readFields() is called.
+ *
+ */
+ void initialize();
+
+ /**
+ * Add an edge.
+ * Note: whether parallel edges are allowed or not depends on the
+ * implementation.
+ *
+ * @param edge Edge to add
+ */
+ void add(Edge<I, E> edge);
+
+ /**
+ * Remove all edges to the given target vertex.
+ * Note: the implementation will vary depending on whether parallel edges
+ * are allowed or not.
+ *
+ * @param targetVertexId Target vertex id
+ */
+ void remove(I targetVertexId);
+
+ /**
+ * Return the number of edges.
+ *
+ * @return Number of edges
+ */
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
new file mode 100644
index 0000000..2281509
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Vertex implementations.
+ */
+package org.apache.giraph.edge;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index c7aff7c..439ee5b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -34,7 +34,6 @@ import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
-import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
import org.apache.hadoop.io.Writable;
@@ -212,8 +211,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
// Make sure every vertex has this thread's
// graphState before computing
vertex.setGraphState(graphState);
- Iterable<M> messages =
- messageStore.getVertexMessages(vertex.getId());
+ Iterable<M> messages = messageStore.getVertexMessages(vertex.getId());
if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
vertex.wakeUp();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
deleted file mode 100644
index 039f0d7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultEdge.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.base.Objects;
-
-/**
- * A complete edge, the target vertex and the edge value. Can only be one
- * edge with a destination vertex id per edge map.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class DefaultEdge<I extends WritableComparable, E extends Writable>
- implements MutableEdge<I, E> {
- /** Target vertex id */
- private I targetVertexId = null;
- /** Edge value */
- private E value = null;
-
- /**
- * Constructor for reflection
- */
- public DefaultEdge() { }
-
- /**
- * Create the edge with final values. Don't call, use EdgeFactory instead.
- *
- * @param targetVertexId Desination vertex id.
- * @param value Value of the edge.
- */
- DefaultEdge(I targetVertexId, E value) {
- this.targetVertexId = targetVertexId;
- this.value = value;
- }
-
- @Override
- public I getTargetVertexId() {
- return targetVertexId;
- }
-
- @Override
- public E getValue() {
- return value;
- }
-
- @Override
- public void setTargetVertexId(I targetVertexId) {
- this.targetVertexId = targetVertexId;
- }
-
- @Override
- public void setValue(E value) {
- this.value = value;
- }
-
- @Override
- public String toString() {
- return "(TargetVertexId = " + targetVertexId + ", " +
- "value = " + value + ")";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(Edge<I, E> edge) {
- return targetVertexId.compareTo(edge.getTargetVertexId());
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- DefaultEdge edge = (DefaultEdge) o;
- return Objects.equal(targetVertexId, edge.targetVertexId) &&
- Objects.equal(value, edge.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(targetVertexId, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
index c88b2b9..52df38d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultVertexResolver.java
@@ -20,8 +20,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -87,9 +86,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
return;
}
if (hasEdgeRemovals(vertexChanges)) {
- MutableVertex<I, V, E, M> mv = (MutableVertex<I, V, E, M>) vertex;
for (I removedDestVertex : vertexChanges.getRemovedEdgeList()) {
- mv.removeEdges(removedDestVertex);
+ vertex.removeEdges(removedDestVertex);
}
}
}
@@ -156,9 +154,8 @@ public class DefaultVertexResolver<I extends WritableComparable,
return;
}
if (hasEdgeAdditions(vertexChanges)) {
- MutableVertex<I, V, E, M> mv = (MutableVertex<I, V, E, M>) vertex;
for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
- mv.addEdge(edge);
+ vertex.addEdge(edge);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java b/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
deleted file mode 100644
index 185e3c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Edge.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A complete edge, the target vertex and the edge value. Can only be one
- * edge with a destination vertex id per edge map.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-public interface Edge<I extends WritableComparable, E extends Writable>
- extends Comparable<Edge<I, E>> {
- /**
- * Get the target vertex index of this edge
- *
- * @return Target vertex index of this edge
- */
- I getTargetVertexId();
-
- /**
- * Get the edge value of the edge
- *
- * @return Edge value of this edge
- */
- E getValue();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
deleted file mode 100644
index a3e6efb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeFactory.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Factory for creating Edges
- */
-public class EdgeFactory {
- /** Do not construct */
- private EdgeFactory() { }
-
- /**
- * Create an edge pointing to a given ID with a value
- *
- * @param id target ID
- * @param value edge value
- * @param <I> Vertex ID type
- * @param <E> Edge Value type
- * @return Edge pointing to ID with value
- */
- public static <I extends WritableComparable,
- E extends Writable>
- Edge<I, E> create(I id, E value) {
- return createMutable(id, value);
- }
-
- /**
- * Create an edge pointing to a given ID without a value
- *
- * @param id target ID
- * @param <I> Vertex ID type
- * @return Edge pointing to ID without a value
- */
- public static <I extends WritableComparable>
- Edge<I, NullWritable> create(I id) {
- return createMutable(id);
- }
-
- /**
- * Create a mutable edge pointing to a given ID with a value
- *
- * @param id target ID
- * @param value edge value
- * @param <I> Vertex ID type
- * @param <E> Edge Value type
- * @return Edge pointing to ID with value
- */
- public static <I extends WritableComparable,
- E extends Writable>
- MutableEdge<I, E> createMutable(I id, E value) {
- if (value instanceof NullWritable) {
- return (MutableEdge<I, E>) createMutable(id);
- } else {
- return new DefaultEdge<I, E>(id, value);
- }
- }
-
- /**
- * Create a mutable edge pointing to a given ID with a value
- *
- * @param id target ID
- * @param <I> Vertex ID type
- * @return Edge pointing to ID with value
- */
- public static <I extends WritableComparable>
- MutableEdge<I, NullWritable> createMutable(I id) {
- return new EdgeNoValue<I>(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
deleted file mode 100644
index 4ac6759..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeNoValue.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.base.Objects;
-
-/**
- * An edge that has no value.
- *
- * @param <I> Vertex ID
- */
-public class EdgeNoValue<I extends WritableComparable>
- implements MutableEdge<I, NullWritable> {
- /** Target vertex id */
- private I targetVertexId = null;
-
- /** Empty constructor */
- EdgeNoValue() { }
-
- /**
- * Constructor with target vertex ID. Don't call, use EdgeFactory instead.
- *
- * @param targetVertexId vertex ID
- */
- EdgeNoValue(I targetVertexId) {
- this.targetVertexId = targetVertexId;
- }
-
- @Override
- public void setTargetVertexId(I targetVertexId) {
- this.targetVertexId = targetVertexId;
- }
-
- @Override
- public void setValue(NullWritable value) {
- // do nothing
- }
-
- @Override
- public I getTargetVertexId() {
- return targetVertexId;
- }
-
- @Override
- public NullWritable getValue() {
- return NullWritable.get();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- EdgeNoValue edge = (EdgeNoValue) o;
- return Objects.equal(targetVertexId, edge.targetVertexId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(targetVertexId);
- }
-
- @Override
- public int compareTo(Edge<I, NullWritable> o) {
- return targetVertexId.compareTo(o.getTargetVertexId());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
deleted file mode 100644
index 6210367..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/EdgeStore.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.MapMaker;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.ByteArrayEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Collects incoming edges for vertices owned by this worker.
- * Note: the current implementation is simply a bridge between
- * incoming requests and vertices. In the future, EdgeStore may become an
- * interface allowing for alternative, pluggable implementations of edge
- * storage without having to extend Vertex.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class EdgeStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(EdgeStore.class);
- /** Service worker. */
- private CentralizedServiceWorker<I, V, E, M> service;
- /** Giraph configuration. */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
- /** Progressable to report progress. */
- private Progressable progressable;
- /** Map used to temporarily store incoming edges. */
- private ConcurrentMap<Integer,
- ConcurrentMap<I, ByteArrayEdges<I, E>>> transientEdges;
- /**
- * Whether we should reuse edge objects (cached to avoid expensive calls
- * to the configuration).
- */
- private boolean reuseIncomingEdgeObjects;
-
- /**
- * Constructor.
- *
- * @param service Service worker
- * @param configuration Configuration
- * @param progressable Progressable
- */
- public EdgeStore(
- CentralizedServiceWorker<I, V, E, M> service,
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
- Progressable progressable) {
- this.service = service;
- this.configuration = configuration;
- this.progressable = progressable;
- reuseIncomingEdgeObjects = configuration.reuseIncomingEdgeObjects();
- transientEdges = new MapMaker().concurrencyLevel(
- configuration.getNettyServerExecutionConcurrency()).makeMap();
- }
-
- /**
- * Add edges belonging to a given partition on this worker.
- * Note: This method is thread-safe.
- *
- * @param partitionId Partition id for the incoming edges.
- * @param edges Incoming edges
- */
- public void addPartitionEdges(
- int partitionId, ByteArrayVertexIdEdges<I, E> edges) {
- ConcurrentMap<I, ByteArrayEdges<I, E>> partitionEdges =
- transientEdges.get(partitionId);
- if (partitionEdges == null) {
- ConcurrentMap<I, ByteArrayEdges<I, E>> newPartitionEdges =
- new MapMaker().concurrencyLevel(
- configuration.getNettyServerExecutionConcurrency()).makeMap();
- partitionEdges = transientEdges.putIfAbsent(partitionId,
- newPartitionEdges);
- if (partitionEdges == null) {
- partitionEdges = newPartitionEdges;
- }
- }
- ByteArrayVertexIdEdges<I, E>.VertexIdEdgeIterator vertexIdEdgeIterator =
- edges.getVertexIdEdgeIterator();
- while (vertexIdEdgeIterator.hasNext()) {
- vertexIdEdgeIterator.next();
- I vertexId = vertexIdEdgeIterator.getCurrentVertexId();
- Edge<I, E> edge = vertexIdEdgeIterator.getCurrentEdge();
- ByteArrayEdges<I, E> vertexEdges = partitionEdges.get(vertexId);
- if (vertexEdges == null) {
- ByteArrayEdges<I, E> newVertexEdges =
- new ByteArrayEdges<I, E>(configuration);
- vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges);
- if (vertexEdges == null) {
- vertexEdges = newVertexEdges;
- // Since we had to use the vertex id as a new key in the map,
- // we need to release the object.
- vertexIdEdgeIterator.releaseCurrentVertexId();
- }
- }
- synchronized (vertexEdges) {
- vertexEdges.appendEdge(edge);
- }
- }
- }
-
- /**
- * Move all edges from temporary storage to their source vertices.
- * Note: this method is not thread-safe.
- */
- public void moveEdgesToVertices() {
- if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
- }
- for (Map.Entry<Integer, ConcurrentMap<I,
- ByteArrayEdges<I, E>>> partitionEdges : transientEdges.entrySet()) {
- Partition<I, V, E, M> partition =
- service.getPartitionStore().getPartition(partitionEdges.getKey());
- for (I vertexId : partitionEdges.getValue().keySet()) {
- // Depending on whether the vertex implementation keeps references to
- // the Edge objects or not, we may be able to reuse objects when
- // iterating.
- Iterable<Edge<I, E>> edgesIterable = reuseIncomingEdgeObjects ?
- partitionEdges.getValue().remove(vertexId) :
- partitionEdges.getValue().remove(vertexId).copyEdgeIterable();
- Vertex<I, V, E, M> vertex = partition.getVertex(vertexId);
- // If the source vertex doesn't exist, create it. Otherwise,
- // just set the edges.
- if (vertex == null) {
- vertex = configuration.createVertex();
- vertex.initialize(vertexId, configuration.createVertexValue(),
- edgesIterable);
- partition.putVertex(vertex);
- } else {
- vertex.setEdges(edgesIterable);
- // Some Partition implementations (e.g. ByteArrayPartition) require
- // us to put back the vertex after modifying it.
- partition.saveVertex(vertex);
- }
- progressable.progress();
- }
- // Some PartitionStore implementations (e.g. DiskBackedPartitionStore)
- // require us to put back the partition after modifying it.
- service.getPartitionStore().putPartition(partition);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("moveEdgesToVertices: Finished moving incoming edges to " +
- "vertices.");
- }
- transientEdges.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
index 726c21e..3c2286d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTransferRegulator.java
@@ -17,7 +17,6 @@
*/
package org.apache.giraph.graph;
-import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 20fa5c5..e74c59a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -18,8 +18,9 @@
package org.apache.giraph.graph;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.BspUtils;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -41,7 +42,6 @@ import org.apache.giraph.time.Time;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
@@ -57,9 +57,6 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
@@ -433,8 +430,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* @param conf the Configuration object for this job run.
*/
public void determineClassTypes(Configuration conf) {
+ ImmutableClassesGiraphConfiguration giraphConf =
+ new ImmutableClassesGiraphConfiguration(conf);
Class<? extends Vertex<I, V, E, M>> vertexClass =
- BspUtils.<I, V, E, M>getVertexClass(conf);
+ giraphConf.getVertexClass();
List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
Vertex.class, vertexClass);
Type vertexIndexType = classList.get(0);
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java b/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
deleted file mode 100644
index 52e4c47..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/MutableEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A complete edge, the target vertex and the edge value. Can only be one
- * edge with a destination vertex id per edge map. This edge can be mutated,
- * that is you can set it's target vertex ID and edge value.
- *
- * @param <I> Vertex index
- * @param <E> Edge value
- */
-public interface MutableEdge<I extends WritableComparable, E extends Writable>
- extends Edge<I, E> {
- /**
- * Set the destination vertex index of this edge.
- *
- * @param targetVertexId new destination vertex
- */
- void setTargetVertexId(I targetVertexId);
-
- /**
- * Set the value for this edge.
- *
- * @param value new edge value
- */
- void setValue(E value);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
deleted file mode 100644
index 4415cc2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ReverseEdgeDuplicator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.io.EdgeReader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * An EdgeReader that creates the opposite direction edge for each edge read.
- * Used to create an undirected graph from a directed input.
- * This class is a decorator around any other EdgeReader.
- *
- * @param <I> Vertex ID
- * @param <E> Edge Value
- */
-public class ReverseEdgeDuplicator<I extends WritableComparable,
- E extends Writable> implements EdgeReader<I, E> {
- /** The underlying EdgeReader to wrap */
- private final EdgeReader<I, E> baseReader;
-
- /** Whether the reverse edge stored currently is valid */
- private boolean haveReverseEdge = true;
- /** Reverse of the edge last read */
- private Edge<I, E> reverseEdge;
- /** Reverse source of last edge, in other words last edge's target */
- private I reverseSourceId;
-
- /**
- * Constructor
- * @param baseReader EdgeReader to wrap
- */
- public ReverseEdgeDuplicator(EdgeReader<I, E> baseReader) {
- this.baseReader = baseReader;
- }
-
- /**
- * Get wrapped EdgeReader
- * @return EdgeReader
- */
- public EdgeReader<I, E> getBaseReader() {
- return baseReader;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- baseReader.initialize(inputSplit, context);
- haveReverseEdge = true;
- }
-
- @Override
- public boolean nextEdge() throws IOException, InterruptedException {
- boolean result = true;
- if (haveReverseEdge) {
- result = baseReader.nextEdge();
- haveReverseEdge = false;
- } else {
- Edge<I, E> currentEdge = baseReader.getCurrentEdge();
- reverseSourceId = currentEdge.getTargetVertexId();
- reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(),
- currentEdge.getValue());
- haveReverseEdge = true;
- }
- return result;
- }
-
- @Override
- public I getCurrentSourceId() throws IOException, InterruptedException {
- if (haveReverseEdge) {
- return reverseSourceId;
- } else {
- return baseReader.getCurrentSourceId();
- }
- }
-
- @Override
- public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
- if (haveReverseEdge) {
- return reverseEdge;
- } else {
- return baseReader.getCurrentEdge();
- }
- }
-
- @Override
- public void close() throws IOException {
- baseReader.close();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return baseReader.getProgress();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
new file mode 100644
index 0000000..c8abab2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Vertex.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.MultiRandomAccessVertexEdges;
+import org.apache.giraph.edge.StrictRandomAccessVertexEdges;
+import org.apache.giraph.edge.VertexEdges;
+import org.apache.giraph.partition.PartitionContext;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Basic abstract class for writing a BSP application for computation.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public abstract class Vertex<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements WorkerAggregatorUsage, Writable,
+ ImmutableClassesGiraphConfigurable<I, V, E, M> {
+ /** Vertex id. */
+ private I id;
+ /** Vertex value. */
+ private V value;
+ /** Outgoing edges. */
+ private VertexEdges<I, E> edges;
+ /** If true, do not do anymore computation on this vertex. */
+ private boolean halt;
+ /** Global graph state **/
+ private GraphState<I, V, E, M> graphState;
+ /** Configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+
+ /**
+ * Initialize id, value, and edges.
+ * This method (or the alternative form initialize(id, value)) must be called
+ * after instantiation, unless readFields() is called.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ * @param edges Iterable of edges
+ */
+ public void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
+ this.id = id;
+ this.value = value;
+ setEdges(edges);
+ }
+
+ /**
+ * Initialize id and value. Vertex edges will be empty.
+ * This method (or the alternative form initialize(id, value, edges))
+ * must be called after instantiation, unless readFields() is called.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ */
+ public void initialize(I id, V value) {
+ this.id = id;
+ this.value = value;
+ this.edges = conf.createAndInitializeVertexEdges(0);
+ }
+
+ /**
+ * Set the outgoing edges for this vertex.
+ *
+ * @param edges Iterable of edges
+ */
+ public void setEdges(Iterable<Edge<I, E>> edges) {
+ // If the iterable is actually an instance of VertexEdges,
+ // we simply take the reference.
+ // Otherwise, we initialize a new VertexEdges.
+ if (edges instanceof VertexEdges) {
+ this.edges = (VertexEdges<I, E>) edges;
+ } else {
+ this.edges = conf.createAndInitializeVertexEdges(edges);
+ }
+ }
+
+ /**
+ * Must be defined by user to do computation on a single Vertex.
+ *
+ * @param messages Messages that were sent to this vertex in the previous
+ * superstep. Each message is only guaranteed to have
+ * a life expectancy as long as next() is not called.
+ * @throws IOException
+ */
+ public abstract void compute(Iterable<M> messages) throws IOException;
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return graphState.getSuperstep();
+ }
+
+ /**
+ * Get the vertex id.
+ *
+ * @return My vertex id.
+ */
+ public I getId() {
+ return id;
+ }
+
+ /**
+ * Get the vertex value (data stored with vertex)
+ *
+ * @return Vertex value
+ */
+ public V getValue() {
+ return value;
+ }
+
+ /**
+ * Set the vertex data (immediately visible in the computation)
+ *
+ * @param value Vertex data to be set
+ */
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getTotalNumVertices() {
+ return graphState.getTotalNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getTotalNumEdges() {
+ return graphState.getTotalNumEdges();
+ }
+
+ /**
+ * Get a read-only view of the out-edges of this vertex.
+ * Note: edge objects returned by this iterable may be invalidated as soon
+ * as the next element is requested. Thus, keeping a reference to an edge
+ * almost always leads to undesired behavior.
+ *
+ * @return the out edges (sort order determined by subclass implementation).
+ */
+ public Iterable<Edge<I, E>> getEdges() {
+ return edges;
+ }
+
+ /**
+ * Get the number of outgoing edges on this vertex.
+ *
+ * @return the total number of outbound edges from this vertex
+ */
+ public int getNumEdges() {
+ return edges.size();
+ }
+
+ /**
+ * Return the value of the first edge with the given target vertex id,
+ * or null if there is no such edge.
+ * Note: edge value objects returned by this method may be invalidated by
+ * the next call. Thus, keeping a reference to an edge value almost always
+ * leads to undesired behavior.
+ *
+ * @param targetVertexId Target vertex id
+ * @return Edge value (or null if missing)
+ */
+ public E getEdgeValue(I targetVertexId) {
+ if (edges instanceof StrictRandomAccessVertexEdges) {
+ return ((StrictRandomAccessVertexEdges<I, E>) edges)
+ .getEdgeValue(targetVertexId);
+ } else {
+ for (Edge<I, E> edge : edges) {
+ if (edge.getTargetVertexId().equals(targetVertexId)) {
+ return edge.getValue();
+ }
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Get an iterable over the values of all edges with the given target
+ * vertex id. This only makes sense for multigraphs (i.e. graphs with
+ * parallel edges).
+ * Note: edge value objects returned by this method may be invalidated as
+ * soon as the next element is requested. Thus, keeping a reference to an
+ * edge value almost always leads to undesired behavior.
+ *
+ * @param targetVertexId Target vertex id
+ * @return Iterable of edge values
+ */
+ public Iterable<E> getAllEdgeValues(final I targetVertexId) {
+ if (edges instanceof MultiRandomAccessVertexEdges) {
+ return ((MultiRandomAccessVertexEdges<I, E>) edges)
+ .getAllEdgeValues(targetVertexId);
+ } else {
+ return new Iterable<E>() {
+ @Override
+ public Iterator<E> iterator() {
+ return new UnmodifiableIterator<E>() {
+ /** Iterator over all edges. */
+ private Iterator<Edge<I, E>> edgeIterator = edges.iterator();
+ /** Last matching edge found. */
+ private Edge<I, E> currentEdge;
+
+ @Override
+ public boolean hasNext() {
+ while (edgeIterator.hasNext()) {
+ currentEdge = edgeIterator.next();
+ if (currentEdge.getTargetVertexId().equals(targetVertexId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public E next() {
+ return currentEdge.getValue();
+ }
+ };
+ }
+ };
+ }
+ }
+
+ /**
+ * Send a message to a vertex id. The message should not be mutated after
+ * this method returns or else undefined results could occur.
+ *
+ * @param id Vertex id to send the message to
+ * @param message Message data to send. Note that after the message is sent,
+ * the user should not modify the object.
+ */
+ public void sendMessage(I id, M message) {
+ if (graphState.getWorkerClientRequestProcessor().
+ sendMessageRequest(id, message)) {
+ graphState.getGraphTaskManager().notifySentMessages();
+ }
+ }
+
+ /**
+ * Send a message to all edges.
+ *
+ * @param message Message sent to all edges.
+ */
+ public void sendMessageToAllEdges(M message) {
+ for (Edge<I, E> edge : getEdges()) {
+ sendMessage(edge.getTargetVertexId(), message);
+ }
+ }
+
+ /**
+ * After this is called, the compute() code will no longer be called for
+ * this vertex unless a message is sent to it. Then the compute() code
+ * will be called once again until this function is called. The
+ * application finishes only when all vertices vote to halt.
+ */
+ public void voteToHalt() {
+ halt = true;
+ }
+
+ /**
+ * Re-activate vertex if halted.
+ */
+ public void wakeUp() {
+ halt = false;
+ }
+
+ /**
+ * Is this vertex done?
+ *
+ * @return True if halted, false otherwise.
+ */
+ public boolean isHalted() {
+ return halt;
+ }
+
+ /**
+ * Add an edge for this vertex (happens immediately)
+ *
+ * @param edge Edge to add
+ */
+ public void addEdge(Edge<I, E> edge) {
+ edges.add(edge);
+ }
+
+ /**
+ * Removes all edges pointing to the given vertex id.
+ *
+ * @param targetVertexId the target vertex id
+ */
+ public void removeEdges(I targetVertexId) {
+ edges.remove(targetVertexId);
+ }
+
+ /**
+ * Sends a request to create a vertex that will be available during the
+ * next superstep.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ * @param edges Initial edges
+ */
+ public void addVertexRequest(I id, V value, VertexEdges<I, E> edges)
+ throws IOException {
+ Vertex<I, V, E, M> vertex = conf.createVertex();
+ vertex.initialize(id, value, edges);
+ graphState.getWorkerClientRequestProcessor().addVertexRequest(vertex);
+ }
+
+ /**
+ * Sends a request to create a vertex that will be available during the
+ * next superstep.
+ *
+ * @param id Vertex id
+ * @param value Vertex value
+ */
+ public void addVertexRequest(I id, V value) throws IOException {
+ addVertexRequest(id, value, conf.createVertexEdges());
+ }
+
+ /**
+ * Request to remove a vertex from the graph
+ * (applied just prior to the next superstep).
+ *
+ * @param vertexId Id of the vertex to be removed.
+ */
+ public void removeVertexRequest(I vertexId) throws IOException {
+ graphState.getWorkerClientRequestProcessor().
+ removeVertexRequest(vertexId);
+ }
+
+ /**
+ * Request to add an edge of a vertex in the graph
+ * (processed just prior to the next superstep)
+ *
+ * @param sourceVertexId Source vertex id of edge
+ * @param edge Edge to add
+ */
+ public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+ throws IOException {
+ graphState.getWorkerClientRequestProcessor().
+ addEdgeRequest(sourceVertexId, edge);
+ }
+
+ /**
+ * Request to remove all edges from a given source vertex to a given target
+ * vertex (processed just prior to the next superstep).
+ *
+ * @param sourceVertexId Source vertex id
+ * @param targetVertexId Target vertex id
+ */
+ public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
+ throws IOException {
+ graphState.getWorkerClientRequestProcessor().
+ removeEdgesRequest(sourceVertexId, targetVertexId);
+ }
+
+ /**
+ * Set the graph state for all workers
+ *
+ * @param graphState Graph state for all workers
+ */
+ public void setGraphState(GraphState<I, V, E, M> graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return graphState.getContext();
+ }
+
+ /**
+ * Get the partition context
+ *
+ * @return Partition context
+ */
+ public PartitionContext getPartitionContext() {
+ return graphState.getPartitionContext();
+ }
+
+ /**
+ * Get the worker context
+ *
+ * @return WorkerContext context
+ */
+ public WorkerContext getWorkerContext() {
+ return graphState.getGraphTaskManager().getWorkerContext();
+ }
+
+ @Override
+ public <A extends Writable> void aggregate(String name, A value) {
+ graphState.getWorkerAggregatorUsage().aggregate(name, value);
+ }
+
+ @Override
+ public <A extends Writable> A getAggregatedValue(String name) {
+ return graphState.getWorkerAggregatorUsage().getAggregatedValue(name);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = conf.createVertexId();
+ id.readFields(in);
+ value = conf.createVertexValue();
+ value.readFields(in);
+ edges = conf.createVertexEdges();
+ edges.readFields(in);
+ halt = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ id.write(out);
+ value.write(out);
+ edges.write(out);
+ out.writeBoolean(halt);
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public String toString() {
+ return "Vertex(id=" + getId() + ",value=" + getValue() +
+ ",#edges=" + getNumEdges() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
index ef61dbb..9474636 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexChanges.java
@@ -18,7 +18,7 @@
package org.apache.giraph.graph;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
index fa33341..ea50f25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexMutations.java
@@ -20,8 +20,8 @@ package org.apache.giraph.graph;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.json.JSONException;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
index 4a36706..1fc0ddc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/VertexResolver.java
@@ -18,7 +18,6 @@
package org.apache.giraph.graph;
-import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
index ed6fad1..f6dccdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java
@@ -18,7 +18,7 @@
package org.apache.giraph.io;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
new file mode 100644
index 0000000..e85931f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/ReverseEdgeDuplicator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * An EdgeReader that creates the opposite direction edge for each edge read.
+ * Used to create an undirected graph from a directed input.
+ * This class is a decorator around any other EdgeReader.
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class ReverseEdgeDuplicator<I extends WritableComparable,
+ E extends Writable> implements EdgeReader<I, E> {
+ /** The underlying EdgeReader to wrap */
+ private final EdgeReader<I, E> baseReader;
+
+ /** Whether the reverse edge stored currently is valid */
+ private boolean haveReverseEdge = true;
+ /** Reverse of the edge last read */
+ private Edge<I, E> reverseEdge;
+ /** Reverse source of last edge, in other words last edge's target */
+ private I reverseSourceId;
+
+ /**
+ * Constructor
+ * @param baseReader EdgeReader to wrap
+ */
+ public ReverseEdgeDuplicator(EdgeReader<I, E> baseReader) {
+ this.baseReader = baseReader;
+ }
+
+ /**
+ * Get wrapped EdgeReader
+ * @return EdgeReader
+ */
+ public EdgeReader<I, E> getBaseReader() {
+ return baseReader;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ baseReader.initialize(inputSplit, context);
+ haveReverseEdge = true;
+ }
+
+ @Override
+ public boolean nextEdge() throws IOException, InterruptedException {
+ boolean result = true;
+ if (haveReverseEdge) {
+ result = baseReader.nextEdge();
+ haveReverseEdge = false;
+ } else {
+ Edge<I, E> currentEdge = baseReader.getCurrentEdge();
+ reverseSourceId = currentEdge.getTargetVertexId();
+ reverseEdge = EdgeFactory.create(baseReader.getCurrentSourceId(),
+ currentEdge.getValue());
+ haveReverseEdge = true;
+ }
+ return result;
+ }
+
+ @Override
+ public I getCurrentSourceId() throws IOException, InterruptedException {
+ if (haveReverseEdge) {
+ return reverseSourceId;
+ } else {
+ return baseReader.getCurrentSourceId();
+ }
+ }
+
+ @Override
+ public Edge<I, E> getCurrentEdge() throws IOException, InterruptedException {
+ if (haveReverseEdge) {
+ return reverseEdge;
+ } else {
+ return baseReader.getCurrentEdge();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ baseReader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return baseReader.getProgress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
index 1b1c896..3487cee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java
@@ -18,7 +18,7 @@
package org.apache.giraph.io;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
index 923ca5c..3ccb0fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexValueReader.java
@@ -19,7 +19,7 @@
package org.apache.giraph.io;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
index 82a19bb..38c5548 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexWriter.java
@@ -20,7 +20,7 @@ package org.apache.giraph.io;
import java.io.IOException;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
index 5092352..8fe0db6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexInputFormat.java
@@ -17,7 +17,8 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -25,8 +26,6 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.Lists;
-
import java.io.IOException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
index 934663e..5815403 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/AdjacencyListTextVertexOutputFormat.java
@@ -17,8 +17,8 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
index 352f054..fe4a1d5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IdWithValueTextOutputFormat.java
@@ -19,7 +19,7 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
index b00e495..28539f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullIntTextInputFormat.java
@@ -18,16 +18,15 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.Lists;
-
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
index dda3f2f..4950d21 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullNullNullTextInputFormat.java
@@ -17,15 +17,14 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
+import com.google.common.collect.ImmutableList;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.ImmutableList;
-
import java.io.IOException;
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
index 1e3b643..0270348 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntNullReverseTextEdgeInputFormat.java
@@ -18,8 +18,8 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.ReverseEdgeDuplicator;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.io.ReverseEdgeDuplicator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
index 21ca427..6eaf7dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexInputFormat.java
@@ -18,8 +18,10 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import com.google.common.collect.Lists;
+import net.iharder.Base64;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -29,9 +31,6 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import com.google.common.collect.Lists;
-import net.iharder.Base64;
-
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
index 0599742..7d8fcf6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonBase64VertexOutputFormat.java
@@ -18,8 +18,9 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import net.iharder.Base64;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -28,8 +29,6 @@ import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
-import net.iharder.Base64;
-
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
index 2df20f1..2ac2dad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexInputFormat.java
@@ -17,9 +17,10 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.giraph.vertex.Vertex;
+import com.google.common.collect.Lists;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
@@ -29,8 +30,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.json.JSONArray;
import org.json.JSONException;
-import com.google.common.collect.Lists;
-
import java.io.IOException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
index 9a751ae..d0a3305 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/JsonLongDoubleFloatDoubleVertexOutputFormat.java
@@ -18,8 +18,8 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
index 4e35201..09fb991 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -17,8 +17,8 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
index 2024863..cd454e3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomEdgeInputFormat.java
@@ -18,12 +18,13 @@
package org.apache.giraph.io.formats;
+import com.google.common.collect.Sets;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,8 +32,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
-import com.google.common.collect.Sets;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -91,7 +90,7 @@ public class PseudoRandomEdgeInputFormat
/** Aggregate vertices (all input splits). */
private long aggregateVertices = -1;
/** Edges per vertex. */
- private long edgesPerVertex = -1;
+ private int edgesPerVertex = -1;
/** BspInputSplit (used only for index). */
private BspInputSplit bspInputSplit;
/** Saved configuration */
@@ -129,7 +128,7 @@ public class PseudoRandomEdgeInputFormat
"initialize: Got " + inputSplit.getClass() +
" instead of " + BspInputSplit.class);
}
- edgesPerVertex = configuration.getLong(
+ edgesPerVertex = configuration.getInt(
PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
if (edgesPerVertex <= 0) {
throw new IllegalArgumentException(
@@ -184,8 +183,8 @@ public class PseudoRandomEdgeInputFormat
"" + destVertexId + ")");
}
return EdgeFactory.create(
- destVertexId,
- new DoubleWritable(random.nextDouble()));
+ destVertexId,
+ new DoubleWritable(random.nextDouble()));
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
index 4da8f9d..40a20e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomVertexInputFormat.java
@@ -18,15 +18,14 @@
package org.apache.giraph.io.formats;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -88,7 +87,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
/** Aggregate vertices (all input splits). */
private long aggregateVertices = -1;
/** Edges per vertex. */
- private long edgesPerVertex = -1;
+ private int edgesPerVertex = -1;
/** BspInputSplit (used only for index). */
private BspInputSplit bspInputSplit;
/** Saved configuration */
@@ -132,7 +131,7 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
"initialize: Got " + inputSplit.getClass() +
" instead of " + BspInputSplit.class);
}
- edgesPerVertex = configuration.getLong(
+ edgesPerVertex = configuration.getInt(
PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 0);
if (edgesPerVertex <= 0) {
throw new IllegalArgumentException(
@@ -161,8 +160,10 @@ public class PseudoRandomVertexInputFormat<M extends Writable> extends
// same.
Random rand = new Random(vertexId);
DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
- List<Edge<LongWritable, DoubleWritable>> edges =
- Lists.newArrayListWithCapacity((int) edgesPerVertex);
+ // In order to save memory and avoid copying, we add directly to a
+ // VertexEdges instance.
+ VertexEdges<LongWritable, DoubleWritable> edges =
+ configuration.createAndInitializeVertexEdges(edgesPerVertex);
Set<LongWritable> destVertices = Sets.newHashSet();
for (long i = 0; i < edgesPerVertex; ++i) {
LongWritable destVertexId = new LongWritable();
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
index 6a5813b..1071196 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexInputFormat.java
@@ -17,7 +17,7 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
index 0538db9..468e6bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/SequenceFileVertexOutputFormat.java
@@ -19,7 +19,7 @@ package org.apache.giraph.io.formats;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
index 36d00db..f7da40f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextDoubleDoubleAdjacencyListVertexInputFormat.java
@@ -17,8 +17,8 @@
*/
package org.apache.giraph.io.formats;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
index c9f5df1..0aae894 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextEdgeInputFormat.java
@@ -19,10 +19,10 @@
package org.apache.giraph.io.formats;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.EdgeReader;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
index e359f66..898e57f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexInputFormat.java
@@ -19,10 +19,10 @@
package org.apache.giraph.io.formats;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
index 9f1fe1f..ad96cfe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/TextVertexOutputFormat.java
@@ -18,7 +18,7 @@
package org.apache.giraph.io.formats;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.hadoop.io.Text;