You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/07 06:37:42 UTC
[5/8] GIRAPH-528: Decouple vertex implementation from edge storage
(apresta)
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index 16b0d48..b08c74d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -18,7 +18,6 @@
package org.apache.giraph.job;
-import org.apache.giraph.bsp.BspUtils;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -28,8 +27,9 @@ import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.vertex.MutableVertex;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -69,6 +69,8 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
private static final int MSG_COMBINER_PARAM_INDEX = 1;
/** E param edge input format index in classList */
private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
+ /** E param vertex edges index in classList */
+ private static final int EDGE_PARAM_VERTEX_EDGES_INDEX = 1;
/** Vertex Index Type */
private Type vertexIndexType;
@@ -103,13 +105,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
public void validateConfiguration() {
checkConfiguration();
Class<? extends Vertex<I, V, E, M>> vertexClass =
- BspUtils.<I, V, E, M>getVertexClass(conf);
+ conf.getVertexClass();
List<Class<?>> classList = ReflectionUtils.getTypeArguments(
Vertex.class, vertexClass);
vertexIndexType = classList.get(ID_PARAM_INDEX);
vertexValueType = classList.get(VALUE_PARAM_INDEX);
edgeValueType = classList.get(EDGE_PARAM_INDEX);
messageValueType = classList.get(MSG_PARAM_INDEX);
+ verifyVertexEdgesGenericTypes();
verifyVertexInputFormatGenericTypes();
verifyEdgeInputFormatGenericTypes();
verifyVertexOutputFormatGenericTypes();
@@ -146,11 +149,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
GiraphConstants.VERTEX_INPUT_FORMAT_CLASS + " and " +
GiraphConstants.EDGE_INPUT_FORMAT_CLASS + " must be non-null");
}
- if (conf.getEdgeInputFormatClass() != null &&
- !(MutableVertex.class.isAssignableFrom(conf.getVertexClass()))) {
- throw new IllegalArgumentException("checkConfiguration: EdgeInputFormat" +
- " only works with mutable vertices");
- }
if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("checkConfiguration: No class found for " +
@@ -158,12 +156,47 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
DefaultVertexResolver.class.getCanonicalName());
}
}
+ if (conf.getVertexEdgesClass() == null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkConfiguration: No class found for " +
+ GiraphConstants.VERTEX_EDGES_CLASS + ", defaulting to " +
+ ByteArrayEdges.class.getCanonicalName());
+ }
+ }
+ }
+
+ /** Verify matching generic types in VertexEdges. */
+ private void verifyVertexEdgesGenericTypes() {
+ Class<? extends VertexEdges<I, E>> vertexEdgesClass =
+ conf.getVertexEdgesClass();
+ List<Class<?>> classList = ReflectionUtils.getTypeArguments(
+ VertexEdges.class, vertexEdgesClass);
+ // VertexEdges implementations can be generic, in which case there are no
+ // types to check.
+ if (classList.isEmpty()) {
+ return;
+ }
+ if (classList.get(ID_PARAM_INDEX) != null &&
+ !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex edges - " + classList.get(ID_PARAM_INDEX));
+ }
+ if (classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX) != null &&
+ !edgeValueType.equals(classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex edges - " +
+ classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX));
+ }
}
/** Verify matching generic types in VertexInputFormat. */
private void verifyVertexInputFormatGenericTypes() {
Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
- BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+ conf.getVertexInputFormatClass();
if (vertexInputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
@@ -198,7 +231,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** Verify matching generic types in EdgeInputFormat. */
private void verifyEdgeInputFormatGenericTypes() {
Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
- BspUtils.<I, E>getEdgeInputFormatClass(conf);
+ conf.getEdgeInputFormatClass();
if (edgeInputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
@@ -227,7 +260,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** If there is a combiner type, verify its generic params match the job. */
private void verifyVertexCombinerGenericTypes() {
Class<? extends Combiner<I, M>> vertexCombinerClass =
- BspUtils.<I, M>getCombinerClass(conf);
+ conf.getCombinerClass();
if (vertexCombinerClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
@@ -250,8 +283,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
/** Verify that the output format's generic params match the job. */
private void verifyVertexOutputFormatGenericTypes() {
Class<? extends VertexOutputFormat<I, V, E>>
- vertexOutputFormatClass =
- BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+ vertexOutputFormatClass = conf.getVertexOutputFormatClass();
if (vertexOutputFormatClass != null) {
List<Class<?>> classList =
ReflectionUtils.getTypeArguments(
@@ -287,7 +319,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
* validate the generic parameter types. */
private void verifyVertexResolverGenericTypes() {
Class<? extends VertexResolver<I, V, E, M>>
- vrClass = BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+ vrClass = conf.getVertexResolverClass();
if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
return;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index 2260837..12aa6fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -21,7 +21,7 @@ import com.google.common.collect.MapMaker;
import com.google.common.primitives.Ints;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 585ab85..6bc9591 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -25,7 +25,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index 657c054..1ca0b61 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -19,7 +19,7 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index cbf6bc3..ae8556f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,7 +19,7 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
deleted file mode 100644
index 1d8fc26..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.MutableEdge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * A list of edges backed by a byte-array.
- * The same Edge object is reused when iterating over all edges,
- * unless an alternative iterable is requested.
- * It automatically optimizes for edges with no value,
- * and also supports shallow copy from another instance.
- *
- * @param <I> Vertex id
- * @param <E> Edge value
- */
-public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
- implements Iterable<Edge<I, E>>, Writable {
- /** Representative edge object. */
- private MutableEdge<I, E> representativeEdge;
- /** Serialized edges. */
- private byte[] serializedEdges;
- /** Number of bytes used in serializedEdges. */
- private int serializedEdgesBytesUsed;
- /** Number of edges. */
- private int edgeCount;
- /** Configuration. */
- private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration;
-
- /**
- * Constructor.
- * Depending on the configuration, instantiates a representative edge with
- * or without an edge value.
- *
- * @param conf Configuration
- */
- public ByteArrayEdges(ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
- configuration = conf;
- representativeEdge = configuration.createMutableEdge();
- ExtendedDataOutput extendedOutputStream =
- configuration.createExtendedDataOutput();
- serializedEdges = extendedOutputStream.getByteArray();
- serializedEdgesBytesUsed = extendedOutputStream.getPos();
- }
-
- /**
- * Constructor.
- * Takes another instance of {@link ByteArrayEdges} and makes a shallow
- * copy of it.
- *
- * @param edges {@link ByteArrayEdges} to copy
- */
- public ByteArrayEdges(ByteArrayEdges<I, E> edges) {
- representativeEdge = edges.representativeEdge;
- serializedEdges = edges.serializedEdges;
- serializedEdgesBytesUsed = edges.serializedEdgesBytesUsed;
- edgeCount = edges.edgeCount;
- configuration = edges.configuration;
- }
-
- /**
- * Append an edge to the serialized representation.
- *
- * @param edge Edge to append
- */
- public final void appendEdge(Edge<I, E> edge) {
- ExtendedDataOutput extendedDataOutput =
- configuration.createExtendedDataOutput(
- serializedEdges, serializedEdgesBytesUsed);
- try {
- WritableUtils.writeEdge(extendedDataOutput, edge);
- } catch (IOException e) {
- throw new IllegalStateException("append: Failed to write to the new " +
- "byte array");
- }
- serializedEdges = extendedDataOutput.getByteArray();
- serializedEdgesBytesUsed = extendedDataOutput.getPos();
- ++edgeCount;
- }
-
- /**
- * Set all the edges.
- * Note: when possible, use the constructor which takes another {@link
- * ByteArrayEdges} instead of a generic {@link Iterable}.
- *
- * @param edges Iterable of edges
- */
- public final void setEdges(Iterable<Edge<I, E>> edges) {
- ExtendedDataOutput extendedOutputStream =
- configuration.createExtendedDataOutput();
- if (edges != null) {
- for (Edge<I, E> edge : edges) {
- try {
- WritableUtils.writeEdge(extendedOutputStream, edge);
- } catch (IOException e) {
- throw new IllegalStateException("setEdges: Failed to serialize " +
- edge);
- }
- ++edgeCount;
- }
- }
- serializedEdges = extendedOutputStream.getByteArray();
- serializedEdgesBytesUsed = extendedOutputStream.getPos();
- }
-
- /**
- * Remove the first edge pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return True if one such edge was found and removed.
- */
- public final boolean removeFirstEdge(I targetVertexId) {
- // Note that this is very expensive (deserializes all edges).
- ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
- int foundStartOffset = 0;
- while (iterator.hasNext()) {
- Edge<I, E> edge = iterator.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
- serializedEdges, foundStartOffset,
- serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
- serializedEdgesBytesUsed -=
- iterator.extendedDataInput.getPos() - foundStartOffset;
- --edgeCount;
- return true;
- }
- foundStartOffset = iterator.extendedDataInput.getPos();
- }
-
- return false;
- }
-
- /**
- * Remove all edges pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return The number of removed edges
- */
- public final int removeAllEdges(I targetVertexId) {
- // Note that this is very expensive (deserializes all edges).
- ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
- int removedCount = 0;
- List<Integer> foundStartOffsets = new LinkedList<Integer>();
- List<Integer> foundEndOffsets = new LinkedList<Integer>();
- int lastStartOffset = 0;
- while (iterator.hasNext()) {
- Edge<I, E> edge = iterator.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- foundStartOffsets.add(lastStartOffset);
- foundEndOffsets.add(iterator.extendedDataInput.getPos());
- ++removedCount;
- }
- lastStartOffset = iterator.extendedDataInput.getPos();
- }
- foundStartOffsets.add(serializedEdgesBytesUsed);
-
- Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
- Integer foundStartOffset = foundStartOffsetIter.next();
- for (Integer foundEndOffset : foundEndOffsets) {
- Integer nextFoundStartOffset = foundStartOffsetIter.next();
- System.arraycopy(serializedEdges, foundEndOffset,
- serializedEdges, foundStartOffset,
- nextFoundStartOffset - foundEndOffset);
- serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
- foundStartOffset = nextFoundStartOffset;
- }
-
- edgeCount -= removedCount;
- return removedCount;
- }
-
- public final int getNumEdges() {
- return edgeCount;
- }
-
- /**
- * Iterator that uses the representative edge (only one iterator allowed
- * at a time).
- */
- private final class ByteArrayEdgeIterator implements Iterator<Edge<I, E>> {
- /** Input for processing the bytes */
- private final ExtendedDataInput extendedDataInput;
-
- /** Constructor. */
- ByteArrayEdgeIterator() {
- extendedDataInput = configuration.createExtendedDataInput(
- serializedEdges, 0, serializedEdgesBytesUsed);
- }
-
- @Override
- public boolean hasNext() {
- return serializedEdges != null && extendedDataInput.available() > 0;
- }
-
- @Override
- public Edge<I, E> next() {
- try {
- WritableUtils.readEdge(extendedDataInput, representativeEdge);
- } catch (IOException e) {
- throw new IllegalStateException("next: Failed on pos " +
- extendedDataInput.getPos() + " edge " + representativeEdge);
- }
- return representativeEdge;
- }
-
- @Override
- public void remove() {
- throw new IllegalAccessError("remove: Not supported");
- }
- }
-
- @Override
- public final Iterator<Edge<I, E>> iterator() {
- return new ByteArrayEdgeIterator();
- }
-
- /**
- * Release and return the current representative edge.
- *
- * @return The released edge
- */
- private Edge<I, E> releaseCurrentEdge() {
- Edge<I, E> releasedEdge = representativeEdge;
- representativeEdge = configuration.createMutableEdge();
- return releasedEdge;
- }
-
- /**
- * Get an iterable wrapper that creates new Edge objects on the fly.
- *
- * @return Edge iteratable that creates new objects
- */
- public final Iterable<Edge<I, E>> copyEdgeIterable() {
- return Iterables.transform(this,
- new Function<Edge<I, E>, Edge<I, E>>() {
- @Override
- public Edge<I, E> apply(Edge<I, E> input) {
- return releaseCurrentEdge();
- }
- });
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- serializedEdgesBytesUsed = in.readInt();
- // Only create a new buffer if the old one isn't big enough
- if (serializedEdges == null ||
- serializedEdgesBytesUsed > serializedEdges.length) {
- serializedEdges = new byte[serializedEdgesBytesUsed];
- }
- in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
- edgeCount = in.readInt();
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- out.writeInt(serializedEdgesBytesUsed);
- out.write(serializedEdges, 0, serializedEdgesBytesUsed);
- out.writeInt(edgeCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
index bd464d5..2cfa661 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -220,6 +220,17 @@ public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
public T getCurrentData() {
return data;
}
+
+ /**
+ * Release the current data object.
+ *
+ * @return Released data object
+ */
+ public T releaseCurrentData() {
+ T releasedData = data;
+ data = null;
+ return releasedData;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
index 1cfd21e..8a5fb01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -19,7 +19,7 @@
package org.apache.giraph.utils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -76,9 +76,23 @@ public class ByteArrayVertexIdEdges<I extends WritableComparable,
* lifetime of the object is only until next() is called.
*/
public class VertexIdEdgeIterator extends VertexIdDataIterator {
+ /**
+ * Get the current edge.
+ *
+ * @return Current edge
+ */
public Edge<I, E> getCurrentEdge() {
return getCurrentData();
}
+
+ /**
+ * Release the current edge.
+ *
+ * @return Released edge
+ */
+ public Edge<I, E> releaseCurrentEdge() {
+ return releaseCurrentData();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index fd06783..0280c58 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -104,6 +104,11 @@ public class ByteArrayVertexIdMessages<I extends WritableComparable,
* lifetime of the object is only until next() is called.
*/
public class VertexIdMessageIterator extends VertexIdDataIterator {
+ /**
+ * Get the current message.
+ *
+ * @return Current message
+ */
public M getCurrentMessage() {
return getCurrentData();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
deleted file mode 100644
index bb940ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import java.util.Iterator;
-
-/** Simple helper class for comparisons and equality checking */
-public class ComparisonUtils {
-
- /** Do not construct this object */
- private ComparisonUtils() { }
-
- /**
- * Compare elements, sort order and length
- *
- * @param <T> Type of iterable to compare.
- * @param first First iterable to compare.
- * @param second Second iterable to compare.
- * @return True if equal, false otherwise.
- */
- public static <T> boolean equal(Iterable<T> first, Iterable<T> second) {
- return equal(first.iterator(), second.iterator());
- }
-
- /**
- * Compare elements, sort order and length
- *
- * @param <T> Type of iterable to compare.
- * @param first First iterable to compare.
- * @param second Second iterable to compare.
- * @return True if equal, false otherwise.
- */
- public static <T> boolean equal(Iterator<T> first, Iterator<T> second) {
- while (first.hasNext() && second.hasNext()) {
- T message = first.next();
- T otherMessage = second.next();
- /* element-wise equality */
- if (!(message == null ? otherMessage == null :
- message.equals(otherMessage))) {
- return false;
- }
- }
- /* length must also be equal */
- return !(first.hasNext() || second.hasNext());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index bd2a9c4..41238d0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -20,24 +20,25 @@ package org.apache.giraph.utils;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.giraph.Algorithm;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.Algorithm;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.io.formats.GiraphFileInputFormat;
-import org.apache.giraph.job.GiraphConfigurationValidator;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.job.GiraphConfigurationValidator;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.edge.VertexEdges;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
@@ -73,6 +74,7 @@ public final class ConfigurationUtils {
OPTIONS.addOption("eip", "edgeInputPath", true, "Edge input path");
OPTIONS.addOption("op", "outputPath", true, "Vertex output path");
OPTIONS.addOption("c", "combiner", true, "Combiner class");
+ OPTIONS.addOption("ve", "vertexEdges", true, "Vertex edges class");
OPTIONS.addOption("wc", "workerContext", true, "WorkerContext class");
OPTIONS.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
OPTIONS.addOption("mc", "masterCompute", true, "MasterCompute class");
@@ -193,6 +195,11 @@ public final class ConfigurationUtils {
(Class<? extends Combiner>)
Class.forName(cmd.getOptionValue("c")));
}
+ if (cmd.hasOption("ve")) {
+ giraphConfiguration.setVertexEdgesClass(
+ (Class<? extends VertexEdges>)
+ Class.forName(cmd.getOptionValue("ve")));
+ }
if (cmd.hasOption("wc")) {
giraphConfiguration.setWorkerContextClass(
(Class<? extends WorkerContext>)
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
new file mode 100644
index 0000000..df40f01
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeComparator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.ComparisonChain;
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Comparator;
+
+/**
+ * Comparator for edges.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value (needs to be WritableComparable)
+ */
+public class EdgeComparator<I extends WritableComparable,
+ E extends WritableComparable> implements Comparator<Edge<I, E>> {
+ @Override
+ public int compare(Edge<I, E> e1, Edge<I, E> e2) {
+ return compareEdges(e1, e2);
+ }
+
+ /**
+ * Compares two edges.
+ *
+ * @param e1 First edge
+ * @param e2 Second edge
+ * @param <I> Vertex id
+ * @param <E> Edge value (needs to be WritableComparable)
+ * @return A negative, zero, or positive value depending
+ */
+ public static <I extends WritableComparable, E extends WritableComparable>
+ int compareEdges(Edge<I, E> e1, Edge<I, E> e2) {
+ return ComparisonChain.start()
+ .compare(e1.getTargetVertexId(), e2.getTargetVertexId())
+ .compare(e1.getValue(), e2.getValue())
+ .result();
+ }
+
+ /**
+ * Indicates whether two edges are equal.
+ *
+ * @param e1 First edge
+ * @param e2 Second edge
+ * @param <I> Vertex id
+ * @param <E> Edge value (needs to be WritableComparable)
+ * @return Whether the two edges are equal
+ */
+ public static <I extends WritableComparable, E extends WritableComparable>
+ boolean equal(Edge<I, E> e1, Edge<I, E> e2) {
+ return compareEdges(e1, e2) == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
index ab288fb..e945d7b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
@@ -18,57 +18,91 @@
package org.apache.giraph.utils;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
+import com.google.common.collect.Iterables;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
/**
- * Utilities for converting between edge iterables and neighbor iterables.
+ * Utility methods for iterables of edges.
*/
public class EdgeIterables {
/** Utility classes shouldn't be instantiated. */
private EdgeIterables() { }
/**
- * Convert an edge iterable into a neighbor iterable.
+ * Compare two edge iterables element-wise. The edge value type needs
+ * to be Comparable.
+ *
+ * @param e1 First edge iterable
+ * @param e2 Second edge iterable
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ * @return Whether the two iterables are equal element-wise
+ */
+ public static <I extends WritableComparable, E extends WritableComparable>
+ boolean equals(Iterable<Edge<I, E>> e1, Iterable<Edge<I, E>> e2) {
+ Iterator<Edge<I, E>> i1 = e1.iterator();
+ Iterator<Edge<I, E>> i2 = e2.iterator();
+ while (i1.hasNext()) {
+ if (!i2.hasNext()) {
+ return false;
+ }
+ if (!EdgeComparator.equal(i1.next(), i2.next())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Make a deep copy of an edge iterable and return it as an {@link
+ * ArrayList}.
+ * Note: this method is slow since it has to deserialize all serialize all
+ * the ids and values. It should only be used in unit tests.
*
- * @param edges Edge iterable.
- * @param <I> Vertex id type.
- * @param <E> Edge value type.
- * @return Neighbor iterable.
+ * @param edges Iterable of edges
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ * @return A new list with copies of all the edges
*/
- public static
- <I extends WritableComparable, E extends Writable>
- Iterable<I> getNeighbors(Iterable<Edge<I, E>> edges) {
- return Iterables.transform(edges,
- new Function<Edge<I, E>, I>() {
- @Override
- public I apply(Edge<I, E> edge) {
- return edge == null ? null : edge.getTargetVertexId();
- }
- });
+ public static <I extends WritableComparable, E extends WritableComparable>
+ ArrayList<Edge<I, E>> copy(Iterable<Edge<I, E>> edges) {
+ Configuration conf = new Configuration();
+ ArrayList<Edge<I, E>> edgeList =
+ new ArrayList<Edge<I, E>>(Iterables.size(edges));
+ for (Edge<I, E> edge : edges) {
+ edgeList.add(EdgeFactory.create(
+ WritableUtils.clone(edge.getTargetVertexId(), conf),
+ WritableUtils.clone(edge.getValue(), conf)));
+ }
+ return edgeList;
}
/**
- * Convert a neighbor iterable into an edge iterable.
+ * Compare two edge iterables up to reordering. The edge value type needs
+ * to be Comparable.
*
- * @param neighbors Neighbor iterable.
- * @param <I> Vertex id type.
- * @return Edge iterable.
+ * @param e1 First edge iterable
+ * @param e2 Second edge iterable
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ * @return Whether the two iterables are equal up to reordering
*/
- public static <I extends WritableComparable>
- Iterable<Edge<I, NullWritable>> getEdges(Iterable<I> neighbors) {
- return Iterables.transform(neighbors,
- new Function<I, Edge<I, NullWritable>>() {
- @Override
- public Edge<I, NullWritable> apply(I neighbor) {
- return EdgeFactory.create(neighbor);
- }
- });
+ public static <I extends WritableComparable, E extends WritableComparable>
+ boolean sameEdges(Iterable<Edge<I, E>> e1, Iterable<Edge<I, E>> e2) {
+ ArrayList<Edge<I, E>> edgeList1 = copy(e1);
+ ArrayList<Edge<I, E>> edgeList2 = copy(e2);
+ Comparator<Edge<I, E>> edgeComparator = new EdgeComparator<I, E>();
+ Collections.sort(edgeList1, edgeComparator);
+ Collections.sort(edgeList2, edgeComparator);
+ return equals(edgeList1, edgeList2);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
index 382c39c..8be82b5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
@@ -18,47 +18,32 @@
package org.apache.giraph.utils;
+import com.google.common.collect.Iterators;
+
import java.util.Iterator;
-import java.util.NoSuchElementException;
/**
- * Helper empty iterable when there are no messages.
+ * Singleton class for empty iterables.
*
- * @param <M> Message data
+ * @param <T> Element type
*/
-public class EmptyIterable<M> implements Iterable<M>, Iterator<M> {
+public class EmptyIterable<T> implements Iterable<T> {
/** Singleton empty iterable */
- private static final EmptyIterable<Object> EMPTY_ITERABLE =
- new EmptyIterable<Object>();
+ private static final EmptyIterable EMPTY_ITERABLE = new EmptyIterable();
/**
* Get the singleton empty iterable
*
- * @param <T> Type of the empty iterable
- * @return Empty singleton iterable
+ * @param <T> Element type
+ * @return Singleton empty iterable
*/
- public static <T> Iterable<T> emptyIterable() {
+ public static <T> Iterable<T> get() {
return (Iterable<T>) EMPTY_ITERABLE;
}
@Override
- public Iterator<M> iterator() {
- return this;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public M next() {
- throw new NoSuchElementException();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ public Iterator<T> iterator() {
+ return Iterators.emptyIterator();
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
index 247130b..0ff366d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -56,9 +56,8 @@ public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
* @param pos Position in the buffer to start writing from
*/
public ExtendedByteArrayDataOutput(byte[] buf, int pos) {
- this.buf = buf;
+ this(buf);
this.count = pos;
- dataOutput = new DataOutputStream(this);
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 889798f..7848d1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -51,6 +51,7 @@ import java.util.concurrent.Executors;
*
* Heavily inspired from Apache Mahout's MahoutTestCase
*/
+@SuppressWarnings("unchecked")
public class InternalVertexRunner {
/** ZooKeeper port to use for tests */
public static final int LOCAL_ZOOKEEPER_PORT = 22182;
@@ -124,6 +125,7 @@ public class InternalVertexRunner {
GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
GiraphConfiguration conf = job.getConfiguration();
conf.setVertexClass(classes.getVertexClass());
+ conf.setVertexEdgesClass(classes.getVertexEdgesClass());
if (classes.hasVertexInputFormat()) {
conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index ae2c556..d70eecb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -84,9 +84,16 @@ public class ReflectionUtils {
// start walking up the inheritance hierarchy until we hit baseClass
while (! getClass(type).equals(baseClass)) {
if (type instanceof Class) {
- // there is no useful information for us in raw types,
- // so just keep going.
- type = ((Class<?>) type).getGenericSuperclass();
+ Type newType = ((Class<?>) type).getGenericSuperclass();
+ if (newType == null) {
+ // we have reached an interface, so we stop here
+ break;
+ } else {
+ // there is no useful information for us in raw types,
+ // so just keep going.
+ type = newType;
+ }
+
} else {
ParameterizedType parameterizedType = (ParameterizedType) type;
Class<?> rawType = (Class<?>) parameterizedType.getRawType();
@@ -161,7 +168,7 @@ public class ReflectionUtils {
}
/**
- * Instantiate classes that are ImmmutableClasssesGiraphConfigurable
+ * Instantiate classes that are ImmutableClassesGiraphConfigurable
*
* @param theClass Class to instantiate
* @param configuration Giraph configuration, may be null
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
deleted file mode 100644
index cdf662e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableDoubleArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive double array
- */
-public class UnmodifiableDoubleArrayIterator extends
- UnmodifiableIterator<DoubleWritable> {
- /** Array to iterate over */
- private final double[] doubleArray;
- /** Offset to array */
- private int offset;
-
- /**
- * Constructor with array to iterate over.
- * @param doubleArray Array to iterate over.
- */
- public UnmodifiableDoubleArrayIterator(double[] doubleArray) {
- this.doubleArray = doubleArray;
- offset = 0;
- }
-
- @Override
- public boolean hasNext() {
- return offset < doubleArray.length;
- }
-
- @Override
- public DoubleWritable next() {
- return new DoubleWritable(doubleArray[offset++]);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
deleted file mode 100644
index c9ba0ef..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.hadoop.io.IntWritable;
-
-/**
- * {@link UnmodifiableIterator} over a primitive int array
- */
-public class UnmodifiableIntArrayIterator extends
- UnmodifiableIterator<IntWritable> {
- /** Array to iterate over */
- private final int[] intArray;
- /** Offset to array */
- private int offset;
-
- /**
- * Constructor with array to iterate over.
- *
- * @param intArray Array to iterate over.
- */
- public UnmodifiableIntArrayIterator(int[] intArray) {
- this.intArray = intArray;
- offset = 0;
- }
-
- @Override
- public boolean hasNext() {
- return offset < intArray.length;
- }
-
- @Override
- public IntWritable next() {
- return new IntWritable(intArray[offset++]);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
deleted file mode 100644
index c580f9c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongArrayIterator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.hadoop.io.LongWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive long array
- */
-public class UnmodifiableLongArrayIterator extends
- UnmodifiableIterator<LongWritable> {
- /** Array to iterate over */
- private final long[] longArray;
- /** Offset to array */
- private int offset;
-
- /**
- * Constructor with array to iterate over.
- * @param longArray Array to iterate over.
- */
- public UnmodifiableLongArrayIterator(long[] longArray) {
- this.longArray = longArray;
- offset = 0;
- }
-
- @Override
- public boolean hasNext() {
- return offset < longArray.length;
- }
-
- @Override
- public LongWritable next() {
- return new LongWritable(longArray[offset++]);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
deleted file mode 100644
index b767058..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongFloatEdgeArrayIterable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import java.util.Iterator;
-
-/**
- * {@link UnmodifiableIterator} over a pair of primitive long-float arrays.
- */
-public class UnmodifiableLongFloatEdgeArrayIterable extends
- UnmodifiableIterator<Edge<LongWritable, FloatWritable>> implements
- Iterable<Edge<LongWritable, FloatWritable>> {
- /** Array of IDs to iterate over */
- private final long[] longArray;
- /** Arrays of weights iterate over */
- private final float[] floatArray;
- /** Offset to array */
- private int offset;
-
- /**
- * Constructor with arrays to iterate over.
- * @param longArray Array of IDs to iterate over.
- * @param floatArray Array of weights to iterate over.
- */
- public UnmodifiableLongFloatEdgeArrayIterable(final long[] longArray,
- final float[] floatArray) {
- this.longArray = longArray;
- this.floatArray = floatArray;
- offset = 0;
- }
-
- @Override
- public boolean hasNext() {
- return offset < longArray.length;
- }
-
- @Override
- public Edge<LongWritable, FloatWritable> next() {
- Edge<LongWritable, FloatWritable> retval =
- EdgeFactory.create(new LongWritable(longArray[offset]),
- new FloatWritable(floatArray[offset]));
- offset++;
- return retval;
- }
-
- @Override
- public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
deleted file mode 100644
index 18f280a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnmodifiableLongNullEdgeArrayIterable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.utils;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import java.util.Iterator;
-
-/**
- * {@link UnmodifiableIterator} over a primitive long array with NullWritable
- * edges.
- */
-public class UnmodifiableLongNullEdgeArrayIterable extends
- UnmodifiableIterator<Edge<LongWritable, NullWritable>> implements
- Iterable<Edge<LongWritable, NullWritable>> {
- /** Arrays to iterate over */
- private final long[] longArray;
- /** Offset to array */
- private int offset;
-
- /**
- * Constructor with array to iterate over.
- * @param longArray Array to iterate over.
- */
- public UnmodifiableLongNullEdgeArrayIterable(final long[] longArray) {
- this.longArray = longArray;
- offset = 0;
- }
-
- @Override
- public Iterator<Edge<LongWritable, NullWritable>> iterator() {
- return this;
- }
-
- @Override
- public boolean hasNext() {
- return offset < longArray.length;
- }
-
- @Override
- public Edge<LongWritable, NullWritable> next() {
- Edge<LongWritable, NullWritable> retval =
- EdgeFactory.create(new LongWritable(longArray[offset]));
- offset++;
- return retval;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
index da6d6cd..9ff1242 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
@@ -116,7 +116,7 @@ public class UnsafeByteArrayOutputStream extends OutputStream
* @param pos Position to write at the buffer
*/
public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
- this.buf = buf;
+ this(buf);
this.pos = pos;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 129923d..6e7b87a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,7 +18,7 @@
package org.apache.giraph.utils;
-import org.apache.giraph.graph.Edge;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperExt.PathStat;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
deleted file mode 100644
index 1e56b20..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * A vertex whose edges are backed by a byte-array.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class ByteArrayVertex<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends ByteArrayVertexBase<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(ByteArrayVertex.class);
-
- @Override
- public final boolean addEdge(Edge<I, E> edge) {
- // Note that this is very expensive (deserializes all edges
- // in an addEdge() request).
- // Hopefully the user set all the edges in setEdges().
- for (Edge<I, E> currentEdge : getEdges()) {
- if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
- LOG.warn("addEdge: Vertex=" + getId() +
- ": already added an edge value for target vertex id " +
- edge.getTargetVertexId());
- return false;
- }
- }
- appendEdge(edge);
- return true;
- }
-
- @Override
- public final int removeEdges(I targetVertexId) {
- return removeFirstEdge(targetVertexId) ? 1 : 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
deleted file mode 100644
index 26c3f62..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.utils.ByteArrayEdges;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Common base class for byte-array backed vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class ByteArrayVertexBase<
- I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> {
- /** Serialized edge list. */
- private ByteArrayEdges<I, E> edges;
-
- /**
- * Append an edge to the serialized representation.
- *
- * @param edge Edge to append
- */
- protected void appendEdge(Edge<I, E> edge) {
- edges.appendEdge(edge);
- }
-
- /**
- * Remove the first edge pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return True if one such edge was found and removed.
- */
- protected boolean removeFirstEdge(I targetVertexId) {
- return edges.removeFirstEdge(targetVertexId);
- }
-
- /**
- * Remove all edges pointing to a target vertex.
- *
- * @param targetVertexId Target vertex id
- * @return The number of removed edges
- */
- protected int removeAllEdges(I targetVertexId) {
- return edges.removeAllEdges(targetVertexId);
- }
-
- @Override
- public final void setEdges(Iterable<Edge<I, E>> edges) {
- // If the edge iterable is backed by a byte-array,
- // we simply get a shallow copy of it.
- if (edges instanceof ByteArrayEdges) {
- this.edges = new ByteArrayEdges<I, E>((ByteArrayEdges<I, E>) edges);
- } else {
- this.edges = new ByteArrayEdges<I, E>(getConf());
- this.edges.setEdges(edges);
- }
- }
-
- @Override
- public final Iterable<Edge<I, E>> getEdges() {
- return edges;
- }
-
- @Override
- public final int getNumEdges() {
- return edges.getNumEdges();
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- I vertexId = getId();
- if (vertexId == null) {
- vertexId = getConf().createVertexId();
- }
- vertexId.readFields(in);
-
- V vertexValue = getValue();
- if (vertexValue == null) {
- vertexValue = getConf().createVertexValue();
- }
- vertexValue.readFields(in);
-
- initialize(vertexId, vertexValue);
-
- edges.readFields(in);
-
- readHaltBoolean(in);
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- edges.write(out);
-
- out.writeBoolean(isHalted());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
deleted file mode 100644
index 882bbb8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.Iterator;
-
-/**
- * User applications can subclass {@link EdgeListVertex}, which stores
- * the outbound edges in an ArrayList (less memory as the cost of expensive
- * random-access lookup). Good for static graphs. Not nearly as memory
- * efficient as using ByteArrayVertex + ByteArrayPartition
- * (probably about 10x more), but not bad when keeping vertices as objects in
- * memory (SimplePartition).
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class EdgeListVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends EdgeListVertexBase<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
- @Override
- public final boolean addEdge(Edge<I, E> edge) {
- for (Edge<I, E> currentEdge : getEdges()) {
- if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
- LOG.warn("addEdge: Vertex=" + getId() +
- ": already added an edge value for target vertex id " +
- edge.getTargetVertexId());
- return false;
- }
- }
- appendEdge(edge);
- return true;
- }
-
- @Override
- public int removeEdges(I targetVertexId) {
- for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
- Edge<I, E> edge = edges.next();
- if (edge.getTargetVertexId().equals(targetVertexId)) {
- edges.remove();
- return 1;
- }
- }
- return 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
deleted file mode 100644
index ec04569..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertexBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Common base class for edge-list backed vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class EdgeListVertexBase<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(EdgeListVertexBase.class);
- /** List of edges */
- private List<Edge<I, E>> edgeList = Lists.newArrayList();
-
- /**
- * Append an edge to the list.
- *
- * @param edge Edge to append
- */
- protected void appendEdge(Edge<I, E> edge) {
- edgeList.add(edge);
- }
-
- @Override
- public void setEdges(Iterable<Edge<I, E>> edges) {
- edgeList.clear();
- Iterables.addAll(edgeList, edges);
- }
-
- @Override
- public Iterable<Edge<I, E>> getEdges() {
- return edgeList;
- }
-
- @Override
- public int getNumEdges() {
- return edgeList.size();
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- I vertexId = getConf().createVertexId();
- vertexId.readFields(in);
- V vertexValue = getConf().createVertexValue();
- vertexValue.readFields(in);
- initialize(vertexId, vertexValue);
-
- int numEdges = in.readInt();
- edgeList = Lists.newArrayListWithCapacity(numEdges);
- for (int i = 0; i < numEdges; ++i) {
- Edge<I, E> edge = getConf().createEdge();
- WritableUtils.readEdge(in, edge);
- edgeList.add(edge);
- }
-
- readHaltBoolean(in);
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(edgeList.size());
- for (Edge<I, E> edge : edgeList) {
- edge.getTargetVertexId().write(out);
- edge.getValue().write(out);
- }
-
- out.writeBoolean(isHalted());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
deleted file mode 100644
index 2160c3b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/HashMapVertex.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.EdgeFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * User applications can subclass {@link HashMapVertex}, which stores
- * the outbound edges in a HashMap, for efficient edge random-access. Note
- * that {@link EdgeListVertex} is much more memory efficient for static graphs.
- * User applications which need to implement their own
- * in-memory data structures should subclass {@link MutableVertex}.
- *
- * Package access will prevent users from accessing internal methods.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class HashMapVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
- /** Map of target vertices and their edge values */
- protected Map<I, E> edgeMap = new HashMap<I, E>();
-
- @Override
- public void setEdges(Iterable<Edge<I, E>> edges) {
- edgeMap.clear();
- for (Edge<I, E> edge : edges) {
- edgeMap.put(edge.getTargetVertexId(), edge.getValue());
- }
- }
-
- @Override
- public boolean addEdge(Edge<I, E> edge) {
- if (edgeMap.put(edge.getTargetVertexId(), edge.getValue()) != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addEdge: Vertex=" + getId() +
- ": already added an edge value for target vertex id " +
- edge.getTargetVertexId());
- }
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public boolean hasEdge(I targetVertexId) {
- return edgeMap.containsKey(targetVertexId);
- }
-
- /**
- * Get an iterator to the edges on this vertex.
- *
- * @return A <em>sorted</em> iterator, as defined by the sort-order
- * of the vertex ids
- */
- @Override
- public Iterable<Edge<I, E>> getEdges() {
- return Iterables.transform(edgeMap.entrySet(),
- new Function<Map.Entry<I, E>, Edge<I, E>>() {
-
- @Override
- public Edge<I, E> apply(Map.Entry<I, E> edge) {
- return EdgeFactory.create(edge.getKey(), edge.getValue());
- }
- });
- }
-
- @Override
- public E getEdgeValue(I targetVertexId) {
- return edgeMap.get(targetVertexId);
- }
-
- @Override
- public int getNumEdges() {
- return edgeMap.size();
- }
-
- @Override
- public int removeEdges(I targetVertexId) {
- return edgeMap.remove(targetVertexId) != null ? 1 : 0;
- }
-
- @Override
- public final void sendMessageToAllEdges(M message) {
- for (I targetVertexId : edgeMap.keySet()) {
- sendMessage(targetVertexId, message);
- }
- }
-
- @Override
- public final void readFields(DataInput in) throws IOException {
- I vertexId = getConf().createVertexId();
- vertexId.readFields(in);
- V vertexValue = getConf().createVertexValue();
- vertexValue.readFields(in);
- initialize(vertexId, vertexValue);
-
- int numEdges = in.readInt();
- edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
- for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = getConf().createVertexId();
- targetVertexId.readFields(in);
- E edgeValue = getConf().createEdgeValue();
- edgeValue.readFields(in);
- edgeMap.put(targetVertexId, edgeValue);
- }
-
- readHaltBoolean(in);
- }
-
- @Override
- public final void write(DataOutput out) throws IOException {
- getId().write(out);
- getValue().write(out);
-
- out.writeInt(edgeMap.size());
- for (Map.Entry<I, E> edge : edgeMap.entrySet()) {
- edge.getKey().write(out);
- edge.getValue().write(out);
- }
-
- out.writeBoolean(isHalted());
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
deleted file mode 100644
index a2090e8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/IntIntNullIntVertex.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import com.google.common.collect.Iterables;
-import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
-import org.apache.hadoop.io.IntWritable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Simple implementation of {@link Vertex} using an int as id, value and
- * message. Edges are immutable and unweighted. This class aims to be as
- * memory efficient as possible.
- */
-public abstract class IntIntNullIntVertex extends
- SimpleVertex<IntWritable, IntWritable, IntWritable> {
- /** Int array of neighbor vertex ids */
- private int[] neighbors;
-
- @Override
- public void setNeighbors(Iterable<IntWritable> neighbors) {
- this.neighbors =
- new int[(neighbors != null) ? Iterables.size(neighbors) : 0];
- int n = 0;
- if (neighbors != null) {
- for (IntWritable neighbor : neighbors) {
- this.neighbors[n++] = neighbor.get();
- }
- }
- }
-
- @Override
- public Iterable<IntWritable> getNeighbors() {
- return new Iterable<IntWritable>() {
- @Override
- public Iterator<IntWritable> iterator() {
- return new UnmodifiableIntArrayIterator(neighbors);
- }
- };
- }
-
- @Override
- public boolean hasEdge(IntWritable targetVertexId) {
- for (int neighbor : neighbors) {
- if (neighbor == targetVertexId.get()) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int getNumEdges() {
- return neighbors.length;
- }
-
- @Override
- public void write(final DataOutput out) throws IOException {
- out.writeInt(getId().get());
- out.writeInt(getValue().get());
- out.writeInt(neighbors.length);
- for (int n = 0; n < neighbors.length; n++) {
- out.writeInt(neighbors[n]);
- }
- out.writeBoolean(isHalted());
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int id = in.readInt();
- int value = in.readInt();
- initialize(new IntWritable(id), new IntWritable(value));
- int numEdges = in.readInt();
- neighbors = new int[numEdges];
- for (int n = 0; n < numEdges; n++) {
- neighbors[n] = in.readInt();
- }
- readHaltBoolean(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3f5009ae/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
deleted file mode 100644
index f36f6db..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/IntNullNullNullVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.vertex;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * A vertex with no value, edges, or messages. Just an ID, nothing more.
- */
-public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
- NullWritable, NullWritable, NullWritable> {
- @Override
- public void setEdges(Iterable<Edge<IntWritable, NullWritable>> edges) { }
-
- @Override
- public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
- return ImmutableList.of();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- getId().write(out);
- out.writeBoolean(isHalted());
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int id = in.readInt();
- initialize(new IntWritable(id), NullWritable.get());
- boolean halt = in.readBoolean();
- if (halt) {
- voteToHalt();
- } else {
- wakeUp();
- }
- }
-}