You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/08/28 15:13:51 UTC
git commit: updated refs/heads/trunk to 0959a87
Updated Branches:
refs/heads/trunk a2e2f0213 -> 0959a87e6
GIRAPH-727
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0959a87e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0959a87e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0959a87e
Branch: refs/heads/trunk
Commit: 0959a87e638e1965ac4920181cbb249f38fb8abb
Parents: a2e2f02
Author: Claudio Martella <cl...@apache.org>
Authored: Wed Aug 28 14:17:48 2013 +0200
Committer: Claudio Martella <cl...@apache.org>
Committed: Wed Aug 28 14:17:48 2013 +0200
----------------------------------------------------------------------
CHANGELOG | 2 +
.../formats/WattsStrogatzVertexInputFormat.java | 214 +++++++++++++++++++
2 files changed, 216 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/0959a87e/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 6b3ec12..b0da51c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-727: Support for Watts Strogatz VertexInputFormat (claudio)
+
GIRAPH-746: Track and log versions of dependencies (nitay)
GIRAPH-751: Build error: convertEdgeToLine in two different classes have the
http://git-wip-us.apache.org/repos/asf/giraph/blob/0959a87e/giraph-core/src/main/java/org/apache/giraph/io/formats/WattsStrogatzVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/WattsStrogatzVertexInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/WattsStrogatzVertexInputFormat.java
new file mode 100644
index 0000000..8148791
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/WattsStrogatzVertexInputFormat.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Generates a random Watts-Strogatz graph by re-wiring a ring lattice.
+ * The resulting graph is a random graph with high clustering coefficient
+ * and low average path length. The graph has these two characteristics that
+ * are typical of small-world scale-free graphs, however the degree
+ * distribution is more similar to a random graph.
+ * It supports a seed for pseudo-random generation.
+ */
+public class WattsStrogatzVertexInputFormat extends
+ VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
+ /** The number of vertices in the graph */
+ private static final String AGGREGATE_VERTICES =
+ "wattsStrogatz.aggregateVertices";
+ /** The number of outgoing edges per vertex */
+ private static final String EDGES_PER_VERTEX =
+ "wattsStrogatz.edgesPerVertex";
+ /** The probability to re-wire an outgoing edge from the ring lattice */
+ private static final String BETA =
+ "wattsStrogatz.beta";
+ /** The seed to generate random values for pseudo-randomness */
+ private static final String SEED =
+ "wattsStrogatz.seed";
+
+ @Override
+ public void checkInputSpecs(Configuration conf) { }
+
+ @Override
+ public final List<InputSplit> getSplits(final JobContext context,
+ final int minSplitCountHint) throws IOException, InterruptedException {
+ return PseudoRandomUtils.getSplits(minSplitCountHint);
+ }
+
+ @Override
+ public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
+ createVertexReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ return new WattsStrogatzVertexReader();
+ }
+
+ /**
+ * Vertex reader used to generate the graph
+ */
+ private static class WattsStrogatzVertexReader extends
+ VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
+ /** the re-wiring probability */
+ private float beta = 0;
+ /** The total number of vertices */
+ private long aggregateVertices = 0;
+ /** The starting vertex id for this split */
+ private long startingVertexId = -1;
+ /** The number of vertices read so far */
+ private long verticesRead = 0;
+ /** The total number of vertices in the split */
+ private long totalSplitVertices = -1;
+ /** the total number of outgoing edges per vertex */
+ private int edgesPerVertex = -1;
+ /** The target ids of the outgoing edges */
+ private final LongSet destVertices = new LongOpenHashSet();
+ /** The random values generator */
+ private Random rnd;
+ /** The reusable edge */
+ private ReusableEdge<LongWritable, DoubleWritable> reusableEdge = null;
+
+ /**
+ * Default constructor
+ */
+ public WattsStrogatzVertexReader() { }
+
+ @Override
+ public void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException {
+ beta = getConf().getFloat(
+ BETA, 0.0f);
+ aggregateVertices = getConf().getLong(
+ AGGREGATE_VERTICES, 0);
+ BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
+ long extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
+ totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
+ if (bspInputSplit.getSplitIndex() < extraVertices) {
+ ++totalSplitVertices;
+ }
+ startingVertexId = bspInputSplit.getSplitIndex() *
+ (aggregateVertices / bspInputSplit.getNumSplits()) +
+ Math.min(bspInputSplit.getSplitIndex(), extraVertices);
+ edgesPerVertex = getConf().getInt(
+ EDGES_PER_VERTEX, 0);
+ if (getConf().reuseEdgeObjects()) {
+ reusableEdge = getConf().createReusableEdge();
+ }
+ int seed = getConf().getInt(SEED, -1);
+ if (seed != -1) {
+ rnd = new Random(seed);
+ } else {
+ rnd = new Random();
+ }
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return totalSplitVertices > verticesRead;
+ }
+
+ /**
+ * Return a long value uniformly distributed between 0 (inclusive) and n.
+ *
+ * @param n the upper bound for the random long value
+ * @return the random value
+ */
+ private long nextLong(long n) {
+ long bits;
+ long val;
+ do {
+ bits = (rnd.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ /**
+ * Get a destination id that is not already in the neighborhood and
+ * that is not the vertex itself (no self-loops). For the second condition
+ * it expects destVertices to contain the own id already.
+ *
+ * @return the destination vertex id
+ */
+ private long getRandomDestination() {
+ long randomId;
+ do {
+ randomId = nextLong(aggregateVertices);
+ } while (!destVertices.add(randomId));
+ return randomId;
+ }
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable, DoubleWritable>
+ getCurrentVertex() throws IOException, InterruptedException {
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
+ getConf().createVertex();
+ long vertexId = startingVertexId + verticesRead;
+ OutEdges<LongWritable, DoubleWritable> edges =
+ getConf().createOutEdges();
+ edges.initialize(edgesPerVertex);
+ destVertices.clear();
+ destVertices.add(vertexId);
+ long destVertexId = vertexId - edgesPerVertex / 2;
+ if (destVertexId < 0) {
+ destVertexId = aggregateVertices + destVertexId;
+ }
+ for (int i = 0; i < edgesPerVertex + 1; ++i) {
+ if (destVertexId != vertexId) {
+ Edge<LongWritable, DoubleWritable> edge =
+ (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
+ edge.getTargetVertexId().set(
+ rnd.nextFloat() < beta ? getRandomDestination() : destVertexId);
+ edge.getValue().set(rnd.nextDouble());
+ edges.add(edge);
+ }
+ destVertexId = (destVertexId + 1) % aggregateVertices;
+ }
+ vertex.initialize(new LongWritable(vertexId),
+ new DoubleWritable(rnd.nextDouble()), edges);
+ ++verticesRead;
+ return vertex;
+ }
+
+ @Override
+ public void close() throws IOException { }
+
+ @Override
+ public float getProgress() throws IOException {
+ return verticesRead * 100.0f / totalSplitVertices;
+ }
+ }
+}